Redesign BlockDBFile to have better API, for easier subclassing

This commit is contained in:
UnknownShadow200 2017-08-17 16:38:35 +10:00
parent 0c0f13dac6
commit 15aaf619ca
2 changed files with 115 additions and 125 deletions

View File

@ -18,15 +18,20 @@
using System;
using System.IO;
using MCGalaxy.Util;
using MCGalaxy.Maths;
namespace MCGalaxy.DB {
public unsafe sealed class BlockDBFile_V1 : BlockDBFile {
public override void WriteEntries(Stream s, FastList<BlockDBEntry> entries) {
byte[] bulk = new byte[BulkEntries * EntrySize];
WriteEntries(s, bulk, entries.Items, entries.Count);
for (int i = 0; i < entries.Count; i += BulkEntries) {
int bulkCount = Math.Min(BulkEntries, entries.Count - i);
for (int j = 0; j < bulkCount; j++) {
WriteEntry(entries.Items[i + j], bulk, j * EntrySize);
}
s.Write(bulk, 0, bulkCount * EntrySize);
}
}
public override void WriteEntries(Stream s, BlockDBCache cache) {
@ -34,38 +39,25 @@ namespace MCGalaxy.DB {
BlockDBCacheNode node = cache.Tail;
while (node != null) {
WriteEntries(s, bulk, node);
int count = node.Count;
for (int i = 0; i < count; i += BulkEntries) {
int bulkCount = Math.Min(BulkEntries, count - i);
for (int j = 0; j < bulkCount; j++) {
BlockDBEntry entry = node.Unpack(node.Entries[i + j]);
WriteEntry(entry, bulk, j * EntrySize);
}
s.Write(bulk, 0, bulkCount * EntrySize);
}
lock (cache.Locker)
node = node.Next;
}
}
public override long CountEntries(Stream s) {
public override long CountEntries(Stream s) {
return (s.Length / BlockDBFile.EntrySize) - BlockDBFile.HeaderEntries;
}
static void WriteEntries(Stream s, byte[] bulk, BlockDBEntry[] entries, int count) {
for (int i = 0; i < count; i += BulkEntries) {
int bulkCount = Math.Min(BulkEntries, count - i);
for (int j = 0; j < bulkCount; j++) {
WriteEntry(entries[i + j], bulk, j * EntrySize);
}
s.Write(bulk, 0, bulkCount * EntrySize);
}
}
static void WriteEntries(Stream s, byte[] bulk, BlockDBCacheNode node) {
int count = node.Count;
for (int i = 0; i < count; i += BulkEntries) {
int bulkCount = Math.Min(BulkEntries, count - i);
for (int j = 0; j < bulkCount; j++) {
BlockDBEntry entry = node.Unpack(node.Entries[i + j]);
WriteEntry(entry, bulk, j * EntrySize);
}
s.Write(bulk, 0, bulkCount * EntrySize);
}
}
// Inlined WriteI32/WriteU16 for better performance
static void WriteEntry(BlockDBEntry entry, byte[] bulk, int index) {
bulk[index + 0 ] = (byte)(entry.PlayerID);
@ -88,75 +80,29 @@ namespace MCGalaxy.DB {
bulk[index + 14] = (byte)(entry.Flags);
bulk[index + 15] = (byte)(entry.Flags >> 8);
}
public override void FindChangesAt(Stream s, int index, Action<BlockDBEntry> output) {
byte[] bulk = new byte[BulkEntries * EntrySize];
fixed (byte* ptr = bulk) {
int dbEntries = (int)(s.Length / EntrySize) - HeaderEntries;
while (dbEntries > 0) {
int count = Math.Min(dbEntries, BulkEntries);
ReadFully(s, bulk, count * EntrySize);
BlockDBEntry* entryPtr = (BlockDBEntry*)ptr;
for (int i = 0; i < count; i++) {
if (entryPtr->Index == index) {
output(*entryPtr);
}
entryPtr++;
}
dbEntries -= count;
}
public unsafe override int ReadForward(Stream s, byte[] bulk, BlockDBEntry* entriesPtr) {
int remaining = (int)((s.Length - s.Position) / EntrySize);
int count = Math.Min(remaining, BulkEntries);
if (count > 0) {
BlockDBFile.ReadFully(s, bulk, count * EntrySize);
}
return count;
}
public override bool FindChangesBy(Stream s, int[] ids, int start, int end, Action<BlockDBEntry> output) {
byte[] bulk = new byte[BulkEntries * EntrySize];
fixed (byte* ptr = bulk) {
int dbEntries = (int)(s.Length / EntrySize) - HeaderEntries;
while (dbEntries > 0) {
int count = Math.Min(dbEntries, BulkEntries);
// find the correct position for the start of this bulk read
s.Position = (dbEntries - count + HeaderEntries) * (long)EntrySize;
ReadFully(s, bulk, count * EntrySize);
BlockDBEntry* entryPtr = (BlockDBEntry*)ptr;
entryPtr += (count - 1);
for (int i = count - 1; i >= 0; i--) {
if (entryPtr->TimeDelta < start) return true;
if (entryPtr->TimeDelta <= end) {
for (int j = 0; j < ids.Length; j++) {
if (entryPtr->PlayerID != ids[j]) continue;
output(*entryPtr); break;
}
}
entryPtr--;
}
dbEntries -= count;
}
public unsafe override int ReadBackward(Stream s, byte[] bulk, BlockDBEntry* entriesPtr) {
long pos = s.Position;
int remaining = (int)(pos / EntrySize) - HeaderEntries;
int count = Math.Min(remaining, BulkEntries);
if (count > 0) {
pos -= count * EntrySize;
s.Position = pos;
BlockDBFile.ReadFully(s, bulk, count * EntrySize);
s.Position = pos; // set correct position for next backward read
}
return false;
}
static ushort ReadU16(byte[] array, int offset) {
return (ushort)(array[offset] | array[offset + 1] << 8);
}
static void WriteU16(ushort value, byte[] array, int index) {
array[index++] = (byte)(value);
array[index++] = (byte)(value >> 8);
}
static void ReadFully(Stream stream, byte[] dst, int count) {
int total = 0;
do {
int read = stream.Read(dst, total, count - total);
if (read == 0) throw new EndOfStreamException();
total += read;
} while (total < count);
return count;
}
}
}

View File

@ -15,10 +15,10 @@
or implied. See the Licenses for the specific language governing
permissions and limitations under the Licenses.
*/
using System;
using System.IO;
using MCGalaxy.Util;
using MCGalaxy.Maths;
using System;
using System.IO;
using MCGalaxy.Maths;
using MCGalaxy.Util;
namespace MCGalaxy.DB {
@ -27,7 +27,7 @@ namespace MCGalaxy.DB {
public const byte Version = 1;
public const int EntrySize = 16;
public const int HeaderEntries = 1;
public const int BulkEntries = 256;
public const int BulkEntries = 2048;
public static BlockDBFile V1 = new BlockDBFile_V1();
@ -67,15 +67,16 @@ namespace MCGalaxy.DB {
public abstract void WriteEntries(Stream s, BlockDBCache cache);
/// <summary> Iterates from the very oldest to newest entry in the BlockDB. </summary>
public abstract void FindChangesAt(Stream s, int index, Action<BlockDBEntry> output);
/// <summary> Reads a block of BlockDB entries, in a forward streaming manner. </summary>
/// <returns> The number of entries read. </returns>
public abstract int ReadForward(Stream s, byte[] bulk, BlockDBEntry* entryPtr);
/// <summary> Iterates from the very newest to oldest entry in the BlockDB. </summary>
/// <returns> whether an entry before start time was reached. </returns>
public abstract bool FindChangesBy(Stream s, int[] ids, int start, int end, Action<BlockDBEntry> output);
/// <summary> Reads a block of BlockDB entries, in a backward streaming manner. </summary>
/// <returns> The number of entries read. </returns>
public abstract int ReadBackward(Stream s, byte[] bulk, BlockDBEntry* entryPtr);
/// <summary> Returns number of entries in the backing file. </summary>
public abstract long CountEntries(Stream s);
public abstract long CountEntries(Stream s);
/// <summary> Deletes the backing file on disc if it exists. </summary>
public static void DeleteBackingFile(string map) {
@ -92,17 +93,6 @@ namespace MCGalaxy.DB {
File.Move(srcPath, dstPath);
}
/// <summary> Returns number of entries in the backing file on disc if it exists. </summary>
public static long CountEntries(string map) {
string path = FilePath(map);
if (!File.Exists(path)) return 0;
using (Stream src = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Read, FileShare.ReadWrite)) {
return V1.CountEntries(src);
}
}
public static void ResizeBackingFile(BlockDB db) {
Logger.Log(LogType.BackgroundActivity, "Resizing BlockDB for " + db.MapName);
string filePath = FilePath(db.MapName);
@ -113,26 +103,22 @@ namespace MCGalaxy.DB {
ReadHeader(src, out dims);
WriteHeader(dst, db.Dims);
int width = db.Dims.X, length = db.Dims.Z;
byte[] bulk = new byte[BulkEntries * EntrySize];
fixed (byte* ptr = bulk) {
int entries = (int)(src.Length / EntrySize) - 1;
BlockDBEntry* entryPtr = (BlockDBEntry*)ptr;
while (entries > 0) {
int read = Math.Min(entries, BulkEntries);
ReadFully(src, bulk, read * EntrySize);
while (true) {
int count = V1.ReadForward(src, bulk, entryPtr);
if (count == 0) break;
for (int i = 0; i < read; i++) {
for (int i = 0; i < count; i++) {
int index = entryPtr[i].Index;
int x = index % dims.X;
int y = (index / dims.X) / dims.Z;
int z = (index / dims.X) % dims.Z;
entryPtr[i].Index = (y * length + z) * width + x;
}
dst.Write(bulk, 0, read * EntrySize);
entries -= read;
dst.Write(bulk, 0, count * EntrySize);
}
}
}
@ -141,6 +127,64 @@ namespace MCGalaxy.DB {
File.Move(tempPath, filePath);
}
/// <summary> Returns number of entries in the backing file on disc if it exists. </summary>
public static long CountEntries(string map) {
string path = FilePath(map);
if (!File.Exists(path)) return 0;
using (Stream src = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Read, FileShare.ReadWrite)) {
Vec3U16 dims;
BlockDBFile file = ReadHeader(src, out dims);
return file.CountEntries(src);
}
}
/// <summary> Iterates from the very oldest to newest entry in the BlockDB. </summary>
public void FindChangesAt(Stream s, int index, Action<BlockDBEntry> output) {
byte[] bulk = new byte[BulkEntries * EntrySize];
fixed (byte* ptr = bulk) {
while (true) {
BlockDBEntry* entryPtr = (BlockDBEntry*)ptr;
int count = ReadForward(s, bulk, entryPtr);
if (count == 0) return;
for (int i = 0; i < count; i++) {
if (entryPtr->Index == index) { output(*entryPtr); }
entryPtr++;
}
}
}
}
/// <summary> Iterates from the very newest to oldest entry in the BlockDB. </summary>
/// <returns> whether an entry before start time was reached. </returns>
public bool FindChangesBy(Stream s, int[] ids, int start, int end, Action<BlockDBEntry> output) {
byte[] bulk = new byte[BulkEntries * EntrySize];
s.Position = s.Length;
fixed (byte* ptr = bulk) {
while (true) {
BlockDBEntry* entryPtr = (BlockDBEntry*)ptr;
int count = ReadBackward(s, bulk, entryPtr);
if (count == 0) break;
entryPtr += (count - 1);
for (int i = count - 1; i >= 0; i--) {
if (entryPtr->TimeDelta < start) return true;
if (entryPtr->TimeDelta <= end) {
for (int j = 0; j < ids.Length; j++) {
if (entryPtr->PlayerID != ids[j]) continue;
output(*entryPtr); break;
}
}
entryPtr--;
}
}
}
return false;
}
static ushort ReadU16(byte[] array, int offset) {
return (ushort)(array[offset] | array[offset + 1] << 8);
}
@ -150,7 +194,7 @@ namespace MCGalaxy.DB {
array[index++] = (byte)(value >> 8);
}
static void ReadFully(Stream stream, byte[] dst, int count) {
internal static void ReadFully(Stream stream, byte[] dst, int count) {
int total = 0;
do {
int read = stream.Read(dst, total, count - total);