From 4135f603c516d25f75af3efa57ee7ca9f993149b Mon Sep 17 00:00:00 2001 From: David Vierra Date: Fri, 25 Nov 2011 15:56:11 -1000 Subject: [PATCH] Added _nbt.pyx, a Cython implentation of nbt that uses extension classes and pointer arithmetic to speed up loading. Renamed nbt.py to pynbt.py, added nbt.py as a thin loader for either _nbt.pyx or pynbt.py. --- _nbt.pyx | 632 +++++++++++++++++++++++++++++++++++++++++++++++++++ java.py | 5 +- mclevel.py | 23 +- nbt.py | 548 +------------------------------------------- pynbt.py | 538 +++++++++++++++++++++++++++++++++++++++++++ schematic.py | 14 +- tests.py | 22 +- 7 files changed, 1211 insertions(+), 571 deletions(-) create mode 100644 _nbt.pyx create mode 100644 pynbt.py diff --git a/_nbt.pyx b/_nbt.pyx new file mode 100644 index 0000000..06a16a8 --- /dev/null +++ b/_nbt.pyx @@ -0,0 +1,632 @@ +# cython: profile=True +# vim:set sw=2 sts=2 ts=2: + +""" +Cython implementation + +Named Binary Tag library. Serializes and deserializes TAG_* objects +to and from binary data. Load a Minecraft level by calling nbt.load(). +Create your own TAG_* objects and set their values. +Save a TAG_* object to a file or StringIO object. + +Read the test functions at the end of the file to get started. + +This library requires Numpy. Get it here: +http://new.scipy.org/download.html + +Official NBT documentation is here: +http://www.minecraft.net/docs/NBT.txt + + +Copyright 2010 David Rio Vierra +""" + +import collections +import itertools +import struct +import gzip +from cStringIO import StringIO +from cpython cimport PyTypeObject, PyObject_TypeCheck, PyUnicode_DecodeUTF8, PyList_Append + +cdef extern from "cStringIO.h": + struct PycStringIO_CAPI: + int cwrite (object o, char *buf, Py_ssize_t len) + PyTypeObject * OutputType +cdef extern from "cobject.h": + void * PyCObject_Import(char *module_name, char *cobject_name) + +cdef PycStringIO_CAPI *PycStringIO = PyCObject_Import("cStringIO", "cStringIO_CAPI") +cdef PyTypeObject * StringO = PycStringIO.OutputType + +cdef cwrite(obj, char * buf, size_t len): + #print "cwrite %s %s %d" % (map(ord, buf[:min(4, len)]), buf[:min(4, len)].decode('ascii', 'replace'), len) + return PycStringIO.cwrite(obj, buf, len) + +import sys +import os; +from os.path import exists +from contextlib import closing + +from numpy import array, zeros, uint8, fromstring, ndarray, frombuffer +cimport numpy as np + +cdef char TAG_END = 0 +cdef char TAG_BYTE = 1 +cdef char TAG_SHORT = 2 +cdef char TAG_INT = 3 +cdef char TAG_LONG = 4 +cdef char TAG_FLOAT = 5 +cdef char TAG_DOUBLE = 6 +cdef char TAG_BYTE_ARRAY = 7 +cdef char TAG_STRING = 8 +cdef char TAG_LIST = 9 +cdef char TAG_COMPOUND = 10 +cdef char TAG_INT_ARRAY = 11 +cdef char TAG_SHORT_ARRAY = 12 + +class NBTFormatError (ValueError): + pass + +cdef class TAG_Value: + cdef unicode _name + cdef public char tagID + def __str__(self): + return self.tostr() + cdef tostr(self): + return str(self.__class__) + ": " + str(self.value) + + + property name: + def __get__(self): + return self._name + def __set__(self, val): + if isinstance(val, str): val = PyUnicode_DecodeUTF8(val, len(val), "strict") + self._name = val + + def __reduce__(self): + return (self.__class__, (self.value, self._name)) + +cdef class TAG_Number(TAG_Value): + pass + +cdef class TAG_Array(TAG_Value): + pass + + +cdef class TAG_Byte(TAG_Number): + cdef public char value + + cdef save_value(self, buf): + save_byte(self.value, buf) + def __init__(self, char value=0, name = u""): + self.value = value + self.name = name + self.tagID = TAG_BYTE + +cdef class TAG_Short(TAG_Number): + cdef public short value + + cdef save_value(self, buf): + save_short(self.value, buf) + def __init__(self, short value=0, name = u""): + self.value = value + self.name = name + self.tagID = TAG_SHORT + +cdef class TAG_Int(TAG_Number): + cdef public int value + + cdef save_value(self, buf): + save_int(self.value, buf) + def __init__(self, int value=0, name = u""): + self.value = value + self.name = name + self.tagID = TAG_INT + +cdef class TAG_Long(TAG_Number): + cdef public long long value + + cdef save_value(self, buf): + save_long(self.value, buf) + def __init__(self, long long value=0, name = u""): + self.value = value + self.name = name + self.tagID = TAG_LONG + +cdef class TAG_Float(TAG_Number): + cdef public float value + + cdef save_value(self, buf): + save_float(self.value, buf) + def __init__(self, float value=0., name = u""): + self.value = value + self.name = name + self.tagID = TAG_FLOAT + +cdef class TAG_Double(TAG_Number): + cdef public double value + + cdef save_value(self, buf): + save_double(self.value, buf) + def __init__(self, double value=0., name = u""): + self.value = value + self.name = name + self.tagID = TAG_DOUBLE + +cdef class TAG_Byte_Array(TAG_Array): + cdef public object value + def __init__(self, value = zeros((0,), 'uint8'), name = u""): + self.value = value + self.name = name + self.tagID = TAG_BYTE_ARRAY + + cdef save_value(self, buf): + save_byte_array(self.value, buf) + + +cdef class TAG_String(TAG_Value): + cdef unicode _value + def __init__(self, value = u"", name = u""): + if isinstance(value, str): value = PyUnicode_DecodeUTF8(value, len(value), "strict") + self.value = value + self.name = name + self.tagID = TAG_STRING + + property value: + def __get__(self): + return self._value + def __set__(self, value): + if isinstance(value, str): value = PyUnicode_DecodeUTF8(value, len(value), "strict") + self._value = value + + cdef save_value(self, buf): + save_string(self.value.encode('utf-8'), buf) + + +cdef class _TAG_List(TAG_Value): + cdef public list value + def __init__(self, value = None, name = u""): + self.value = list(value or []) + self.name = name + self.tagID = TAG_LIST + + """collection methods""" + def __getitem__(self, key): + return self.value[key] + def __setitem__(self, key, val): + self.value[key] = val + def __iter__(self): + return iter(self.value) + def __len__(self): return len(self.value) + def insert(self, idx, key): + self.value.insert(idx, key) + property list_type: + def __get__(self): + if len(self.value): return self.value[0].tagID + return TAG_BYTE + + cdef save_value(self, buf): + save_tag_id(self.list_type, buf) + save_int(len(self.value), buf) + + items = self.value + for subtag in items: + if subtag.tagID != self.list_type: + raise NBTFormatError, "Asked to save TAG_List with different types! Found %s and %s" % (subtag.tagID, self.list_type) + save_tag_value(subtag, buf) + +class TAG_List(_TAG_List, collections.MutableSequence): + pass + +cdef class _TAG_Compound(TAG_Value): + cdef public dict value + def __init__(self, value = None, name = u""): + self.value = value or {} + self.name = name + self.tagID = TAG_COMPOUND + + """collection methods""" + def __getitem__(self, key): + return self.value[key] + def __setitem__(self, key, val): + assert isinstance(val, TAG_Value) + val.name = key + self.value[key] = val + def __delitem__(self, key): + del self.value[key] + def __iter__(self): return iter(self.value) + def __contains__(self, k):return k in self.value + def __len__(self): return len(self.value) + + def __str__(self): + return str(self.__class__) + ": " + str(self.value) + __repr__ = __str__ + def add(self, tag): + assert tag.name + self[tag.name] = tag + + cdef save_value(self, buf): + i = self.iteritems() + for name, subtag in i: + #print "save_tag_name", name, subtag.tagID, "Named", subtag.name, + save_tag_id(subtag.tagID, buf) + #print "id", + save_tag_name(subtag, buf) + #print "name", + save_tag_value(subtag, buf) + #print "value", name + save_tag_id(TAG_END, buf) + +class TAG_Compound(_TAG_Compound, collections.MutableMapping): + def __init__(self, value = None, name = u""): + _TAG_Compound.__init__(self, value, name) + def save(self, filename = "", buf = None): + save_root_tag(self, filename, buf) + +cdef class TAG_Int_Array(TAG_Array): + cdef char _tagID(self): return TAG_INT_ARRAY + cdef public object value + + cdef save_value(self, buf): + pass +cdef class TAG_Short_Array(TAG_Array): + cdef char _tagID(self): return TAG_SHORT_ARRAY + cdef public object value + + cdef save_value(self, buf): + pass + +#cdef int needswap = (sys.byteorder == "little") +cdef swab(void * vbuf, int nbytes): + cdef unsigned char * buf = (vbuf) + #print "Swapping ", nbytes, "bytes" + #for i in range(nbytes): print buf[i], + #print "to", + #if not needswap: return + cdef int i + for i in range((nbytes+1)/2): + buf[i], buf[nbytes-i-1] = buf[nbytes-i-1], buf[i] + #for i in range(nbytes): print buf[i], + +import zlib +def gunzip(data): + #strip off the header and use negative WBITS to tell zlib there's no header + return zlib.decompress(data[10:], -zlib.MAX_WBITS) +def try_gunzip(data): + try: + data = gunzip(data) + except Exception, e: + pass + return data + +def load(buf=None, filename=None): + try: + if isinstance(buf, basestring) and exists(buf): + filename = buf + except TypeError: + pass + + if filename and exists(filename): + data = file(filename, "rb").read() + data = try_gunzip(data) + return load_buffer(data) + + return load_buffer(try_gunzip(buf)) + +cdef class load_ctx: + cdef unsigned long offset + cdef char * buffer + cdef unsigned long size + cdef int require(self, int s) except -1: + #print "Asked for ", s + if s > self.size - self.offset: + raise NBTFormatError, "NBT Stream too short. Asked for %d, only had %d" % (s, (self.size - self.offset)) + + return 0 + +should_dump = False +cdef load_buffer(bytes buf): + cdef load_ctx ctx = load_ctx() + ctx.offset = 1 + ctx.buffer = buf + ctx.size = len(buf) + if len(buf) < 1: + raise NBTFormatError, "NBT Stream too short!" + + if should_dump: print dump(buf) + assert ctx.buffer[0] == TAG_COMPOUND, "Data is not a TAG_Compound (found %d)" % ctx.buffer[0] + name = load_string(ctx) + #print "Root name", name + tag = load_compound(ctx) + tag.name = name + return tag + +cdef load_byte(load_ctx ctx): + ctx.require(1) + cdef char * ptr = (ctx.buffer + ctx.offset) + ctx.offset += 1 + cdef TAG_Byte tag = TAG_Byte.__new__(TAG_Byte) + tag.value = ptr[0] + tag.tagID = TAG_BYTE + tag._name = u"" + return tag + + +cdef load_short(load_ctx ctx): + ctx.require(2) + cdef short * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 2) + ctx.offset += 2 + cdef TAG_Short tag = TAG_Short.__new__(TAG_Short) + tag.value = ptr[0] + tag.tagID = TAG_SHORT + tag._name = u"" + return tag + + +cdef load_int(load_ctx ctx): + ctx.require(4) + cdef int * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 4) + ctx.offset += 4 + cdef TAG_Int tag = TAG_Int.__new__(TAG_Int) + tag.value = (ptr[0]) + tag.tagID = TAG_INT + tag._name = u"" + return tag + +cdef load_long(load_ctx ctx): + ctx.require(8) + cdef long long * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 8) + ctx.offset += 8 + cdef TAG_Long tag = TAG_Long.__new__(TAG_Long) + tag.value = ptr[0] + tag.tagID = TAG_LONG + tag._name = u"" + return tag + + +cdef load_float(load_ctx ctx): + ctx.require(4) + cdef float * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 4) + ctx.offset += 4 + cdef TAG_Float tag = TAG_Float.__new__(TAG_Float) + tag.value = ptr[0] + tag.tagID = TAG_FLOAT + tag._name = u"" + return tag + + +cdef load_double(load_ctx ctx): + ctx.require(8) + cdef double * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 8) + ctx.offset += 8 + cdef TAG_Double tag = TAG_Double.__new__(TAG_Double) + tag.value = ptr[0] + tag.tagID = TAG_DOUBLE + tag._name = u"" + return tag + + +cdef load_bytearray(load_ctx ctx): + ctx.require(4) + cdef unsigned int * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 4) + cdef unsigned int length = ptr[0] + ctx.offset += 4 + cdef char * arr = ctx.buffer + ctx.offset + #print "Bytearray", length, ctx.size - ctx.offset + ctx.require(length) + ctx.offset += length + return TAG_Byte_Array(fromstring(arr[:length], dtype='uint8', count=length)) + + +### --- load_compound --- +cdef load_compound(load_ctx ctx): + #print "load_compound buf=%d off=%d" % (ctx.buffer[0], ctx.offset) + cdef char tagID + cdef _TAG_Compound root_tag = TAG_Compound() + assert root_tag is not None + + while True: + ctx.require(1) + tagID = ctx.buffer[ctx.offset] + ctx.offset += 1 + if tagID == TAG_END: + #print "TAG_END at ", ctx.offset + break; + else: + name = load_string(ctx) + tag = load_tag(tagID, ctx) + #tag.name = name + #print "tagID=%d name=%s at %d" % (tagID, tag.name, ctx.offset) + root_tag[name] = tag + return root_tag + +cdef load_list(load_ctx ctx): + ctx.require(5) + cdef char tagID = ctx.buffer[ctx.offset] + ctx.offset += 1 + cdef int * ptr = (ctx.buffer + ctx.offset) + swab(ptr, 4) + ctx.offset += 4 + length = ptr[0] + cdef _TAG_List tag = TAG_List() + cdef list val = tag.value + cdef int i + for i in range(length): + PyList_Append(val, load_tag(tagID, ctx)) + + return tag + +cdef unicode load_string(load_ctx ctx): + ctx.require(2) + cdef unsigned short * ptr = (ctx.buffer+ctx.offset) + swab(ptr, 2) + ctx.offset += 2 + cdef unsigned short length = ptr[0] + #print "String: ", ctx.offset, length + cdef unicode u = PyUnicode_DecodeUTF8(ctx.buffer + ctx.offset, length, "strict") + ctx.offset += length + return u + +cdef load_tag(char tagID, load_ctx ctx): + + + if tagID == TAG_BYTE: + return load_byte(ctx) + + if tagID == TAG_SHORT: + return load_short(ctx) + + if tagID == TAG_INT: + return load_int(ctx) + + if tagID == TAG_LONG: + return load_long(ctx) + + if tagID == TAG_FLOAT: + return load_float(ctx) + + if tagID == TAG_DOUBLE: + return load_double(ctx) + + if tagID == TAG_BYTE_ARRAY: + return load_bytearray(ctx) + + if tagID == TAG_STRING: + u = load_string(ctx) + return TAG_String(u) + + if tagID == TAG_LIST: + return load_list(ctx) + if tagID == TAG_COMPOUND: + return load_compound(ctx) + + + if tagID == TAG_INT_ARRAY: + pass + if tagID == TAG_SHORT_ARRAY: + pass + +FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) +def dump(src, length=8): + N=0; result='' + while src: + s,src = src[:length],src[length:] + hexa = ' '.join(["%02X"%ord(x) for x in s]) + s = s.translate(FILTER) + result += "%04X %-*s %s\n" % (N, length*3, hexa, s) + N+=length + return result + +cdef save_root_tag(tag, filename = "", buf = None): + sio = StringIO() + save_tag(tag, sio) + data = sio.getvalue() + if buf is None: + f = file(filename, "wb") + gzio = StringIO() + gz = gzip.GzipFile(fileobj=gzio, mode='wb', compresslevel=2) + gz.write(data) + gz.close() + f.write(gzio.getvalue()) + else: + buf.write(data) + +cdef save_tag(TAG_Value tag, object buf): + save_tag_id(tag.tagID, buf) + save_tag_name(tag, buf) + save_tag_value(tag, buf) + +cdef save_tag_id(char tagID, object buf): + cwrite(buf, &tagID, 1) + +cdef save_tag_name(TAG_Value tag, object buf): + name = tag.name.encode('utf-8') + save_string(name, buf) + +cdef save_string(bytes value, object buf): + cdef unsigned short length = len(value) + cdef char * s = value + swab(&length, 2) + cwrite(buf, &length, 2) + cwrite(buf, s, len(value)) + +cdef save_byte_array(object value, object buf): + value = value.tostring() + cdef char * s = value + cdef unsigned int length = len(value) + swab(&length, 4) + cwrite(buf, &length, 4) + cwrite(buf, s, len(value)) + +cdef save_byte(char value, object buf): + cwrite(buf, &value, 1) + +cdef save_short(short value, object buf): + swab(&value, 2) + cwrite(buf, &value, 2) + +cdef save_int(int value, object buf): + swab(&value, 4) + cwrite(buf, &value, 4) + +cdef save_long(long long value, object buf): +# print "Long long value: ", value, sizeof(value) + swab(&value, 8) + cdef char * p = &value + cwrite(buf, p, 8) + #cwrite(buf, p+4, 4) + #cwrite(buf, "\0\0\0\0\0\0\0\0", 8) + +cdef save_float(float value, object buf): + swab(&value, 4) + cwrite(buf, &value, 4) + +cdef save_double(double value, object buf): + swab(&value, 8) + cwrite(buf, &value, 8) + +cdef save_tag_value(TAG_Value tag, object buf): + cdef char tagID = tag.tagID + if tagID == TAG_BYTE: + (tag).save_value(buf) + + if tagID == TAG_SHORT: + (tag).save_value(buf) + + if tagID == TAG_INT: + (tag).save_value(buf) + + if tagID == TAG_LONG: + (tag).save_value(buf) + + if tagID == TAG_FLOAT: + (tag).save_value(buf) + + if tagID == TAG_DOUBLE: + (tag).save_value(buf) + + if tagID == TAG_BYTE_ARRAY: + (tag).save_value(buf) + + if tagID == TAG_STRING: + (tag).save_value(buf) + + if tagID == TAG_LIST: + (<_TAG_List>tag).save_value(buf) + if tagID == TAG_COMPOUND: + (<_TAG_Compound>tag).save_value(buf) + + + if tagID == TAG_INT_ARRAY: + pass + if tagID == TAG_SHORT_ARRAY: + pass + + + diff --git a/java.py b/java.py index a78f5e6..f2bbf26 100644 --- a/java.py +++ b/java.py @@ -62,7 +62,10 @@ class MCJavaLevel(MCLevel): def __init__(self, filename, data): self.filename = filename; + if isinstance(data, basestring): + data = fromstring(data, dtype='uint8') self.filedata = data; + #try to take x,z,y from the filename r = re.findall("\d+", os.path.basename(filename)) if r and len(r) >= 3: @@ -77,7 +80,7 @@ class MCJavaLevel(MCLevel): blockCount = h * l * w if blockCount > data.shape[0]: - raise ValueError, "Level file does not contain enough blocks! Try putting the size into the filename, e.g. server_level_{w}_{l}_{h}.dat".format(w=w, l=l, h=h); + raise ValueError, "Level file does not contain enough blocks! (size {s}) Try putting the size into the filename, e.g. server_level_{w}_{l}_{h}.dat".format(w=w, l=l, h=h, s=data.shape); blockOffset = data.shape[0] - blockCount diff --git a/mclevel.py b/mclevel.py index c943a45..5d4b420 100644 --- a/mclevel.py +++ b/mclevel.py @@ -255,8 +255,8 @@ def fromFile(filename, loadInfinite=True): compressed = False; unzippedData = rawdata - data = fromstring(unzippedData, dtype='uint8') - + #data = + data = unzippedData if MCJavaLevel._isDataLevel(data): info(u"Detected compressed Java-style level") lev = MCJavaLevel(filename, data); @@ -265,15 +265,17 @@ def fromFile(filename, loadInfinite=True): try: root_tag = nbt.load(buf=data); + except Exception, e: info(u"Error during NBT load: {0!r}".format(e)) + info(traceback.format_exc()) info(u"Fallback: Detected compressed flat block array, yzx ordered ") try: lev = MCJavaLevel(filename, data); lev.compressed = compressed; return lev; except Exception, e2: - raise LoadingError, ("Multiple errors encountered", e, e2) + raise LoadingError, ("Multiple errors encountered", e, e2), sys.exc_info()[2] else: if(MCIndevLevel._isTagLevel(root_tag)): @@ -288,8 +290,6 @@ def fromFile(filename, loadInfinite=True): return INVEditChest(root_tag=root_tag, filename=filename); - #it must be a plain array of blocks. see if MCJavaLevel handles it. - raise IOError, "Cannot detect file type." @@ -301,16 +301,3 @@ def loadWorldNumber(i): #deprecated filename = u"{0}{1}{2}{3}{1}".format(saveFileDir, os.sep, u"World", i) return fromFile(filename) -# -#from formats import * -#from box import BoundingBox -# -####xxxxx CHECK RESULTS -# -##import cProfile -#if __name__=="__main__": -# #cProfile.run('testmain()'); -# logging.basicConfig(format=u'%(levelname)s:%(message)s') -# logging.getLogger().level = logging.INFO -# -# testmain(); diff --git a/nbt.py b/nbt.py index ea679a4..86045d7 100644 --- a/nbt.py +++ b/nbt.py @@ -1,538 +1,10 @@ - -# vim:set sw=2 sts=2 ts=2: - -""" -Named Binary Tag library. Serializes and deserializes TAG_* objects -to and from binary data. Load a Minecraft level by calling nbt.load(). -Create your own TAG_* objects and set their values. -Save a TAG_* object to a file or StringIO object. - -Read the test functions at the end of the file to get started. - -This library requires Numpy. Get it here: -http://new.scipy.org/download.html - -Official NBT documentation is here: -http://www.minecraft.net/docs/NBT.txt - - -Copyright 2010 David Rio Vierra -""" -import collections -import itertools -import struct -import gzip -from cStringIO import StringIO; -import os; -from contextlib import closing -from numpy import array, zeros, uint8, fromstring -TAGfmt = ">b" - -class NBTFormatError(RuntimeError): pass - -class TAG_Value(object): - """Simple values. Subclasses override fmt to change the type and size. - Subclasses may set dataType instead of overriding setValue for automatic data type coercion""" - - fmt = ">b"; - tag = -1; #error! - - _value = None - def getValue(self): - return self._value - def setValue(self, newVal): - self._value = self.dataType(newVal) - value = property(getValue, setValue, None, "Change the TAG's value. Data types are checked and coerced if needed.") - - _name = None - def getName(self): - return self._name - def setName(self, newVal): - self._name = str(newVal) - def delName(self): - self._name = "" - name = property(getName, setName, delName, "Change the TAG's name. Coerced to a string.") - - @classmethod - def load_from(cls, data, data_cursor): - data = data[data_cursor:] - (value,) = struct.unpack_from(cls.fmt, data); - self = cls(value=value) - return self, data_cursor + struct.calcsize(self.fmt) - - def __init__(self, value=0, name=None): - self.name = name - self.value = value - - - - def __repr__(self): - return "%s( \"%s\" ): %s" % (str(self.__class__), self.name, repr(self.value)) - - def __str__(self): - return self.pretty_string() - - def pretty_string(self, indent=0): - if self.name: - return " " * indent + "%s( \"%s\" ): %s" % (str(self.__class__.__name__), self.name, self.value) - else: - return " " * indent + "%s: %s" % (str(self.__class__.__name__), self.value) - - - def write_tag(self, buf): - buf.write(struct.pack(TAGfmt, self.tag)) - def write_name(self, buf): - if(self.name != None): - TAG_String(self.name).write_value(buf) - def write_value(self, buf): - buf.write(struct.pack(self.fmt, self.value)) - - def save(self, filename="", buf=None): - if(filename): - self.saveGzipped(filename); - return; - "Save the tagged element to a file." - if self.name == None: self.name = "" #root tag must have name - self.write_tag(buf) - self.write_name(buf) - self.write_value(buf) - - def saveGzipped(self, filename, compresslevel=1): - sio = StringIO(); - #atomic write - try: os.rename(filename, filename + ".old"); - except Exception, e: - #print "Atomic Save: No existing file to rename" - pass - - with closing(gzip.GzipFile(fileobj=sio, mode="wb", compresslevel=compresslevel)) as outputGz: - self.save(buf=outputGz); - outputGz.flush(); - - #print len(sio.getvalue()); - try: - with open(filename, 'wb') as f: - f.write(sio.getvalue()); - except: - try: - os.rename(filename + ".old", filename,); - except Exception, e: - print e; - return - - try: os.remove(filename + ".old"); - except Exception, e: - #print "Atomic Save: No old file to remove" - pass; - -class TAG_Byte(TAG_Value): - tag = 1; - fmt = ">b"; - dataType = int - -class TAG_Short(TAG_Value): - tag = 2; - fmt = ">h"; - dataType = int - -class TAG_Int(TAG_Value): - tag = 3; - fmt = ">i"; - dataType = int - -class TAG_Long(TAG_Value): - tag = 4; - fmt = ">q"; - dataType = long - -class TAG_Float(TAG_Value): - tag = 5; - fmt = ">f"; - dataType = float - - -class TAG_Double(TAG_Value): - tag = 6; - fmt = ">d"; - dataType = float - - -class TAG_Byte_Array(TAG_Value): - """Like a string, but for binary data. four length bytes instead of - two. value is a numpy array, and you can change its elements""" - - tag = 7; - fmt = ">i%ds" - - def dataType(self, value): - return array(value, uint8) - - def __repr__(self): - return "<%s: length %d> ( %s )" % (self.__class__, len(self.value), self.name) - - - def pretty_string(self, indent=0): - if self.name: - return " " * indent + "%s( \"%s\" ): shape=%s dtype=%s %s" % ( - str(self.__class__.__name__), - self.name, - str(self.value.shape), - str(self.value.dtype), - self.value) - else: - return " " * indent + "%s: %s %s" % (str(self.__class__.__name__), str(self.value.shape), self.value) - - @classmethod - def load_from(cls, data, data_cursor): - data = data[data_cursor:] - (string_len,) = struct.unpack_from(">I", data); - value = fromstring(data[4:string_len + 4], 'uint8'); - self = cls(value) - return self, data_cursor + string_len + 4 - - def __init__(self, value=zeros(0, uint8), name=None): - if name: - self.name = name - self.value = value; - - - def write_value(self, buf): - #print self.value - valuestr = self.value.tostring() - buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr), valuestr)) - -class TAG_Int_Array(TAG_Byte_Array): - """An array of ints""" - tag = 11; - def dataType(self, value): - return array(value, '>u4') - - @classmethod - def load_from(cls, data, data_cursor): - data = data[data_cursor:] - (string_len,) = struct.unpack_from(">I", data); - value = fromstring(data[4:string_len * 4 + 4], '>u4') - self = cls(value) - return self, data_cursor + len(self.value) * 4 + 4; - - def __init__(self, value=zeros(0, ">u4"), name=None): - self.name = name - self.value = value; - - - def write_value(self, buf): - #print self.value - valuestr = self.value.tostring() - buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 4, valuestr)) - -class TAG_Short_Array(TAG_Int_Array): - """An array of ints""" - tag = 12; - def dataType(self, value): - return array(value, '>u2') - - @classmethod - def load_from(cls, data, data_cursor): - data = data[data_cursor:] - (string_len,) = struct.unpack_from(">I", data); - value = fromstring(data[4:string_len * 2 + 4], '>u2') - self = cls(value) - return self, data_cursor + len(self.value) * 2 + 4; - - def __init__(self, value=zeros(0, ">u2"), name=None): - self.name = name - self.value = value; - - - def write_value(self, buf): - #print self.value - valuestr = self.value.tostring() - buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 2, valuestr)) - -class TAG_String(TAG_Value): - """String in UTF-8 - The value parameter must be a 'unicode' or a UTF-8 encoded 'str' - """ - - tag = 8; - fmt = ">h%ds" - dataType = lambda self, s: isinstance(s, unicode) and s.encode('utf-8') or s - - @classmethod - def load_from(cls, data, data_cursor): - data = data[data_cursor:] - (string_len,) = struct.unpack_from(">H", data); - value = data[2:string_len + 2].tostring(); - self = cls(value) - return self, data_cursor + string_len + 2; - - def __init__(self, value="", name=None): - if name: - self.name = name - self.value = value - - def write_value(self, buf): - u8value = self._value - buf.write(struct.pack(self.fmt % (len(u8value),), len(u8value), u8value)) - - @property - def unicodeValue(self): - return self.value.decode('utf-8') - - - -class TAG_Compound(TAG_Value, collections.MutableMapping): - """A heterogenous list of named tags. Names must be unique within - the TAG_Compound. Add tags to the compound using the subscript - operator []. This will automatically name the tags.""" - - tag = 10; - - def dataType(self, val): - for i in val: - assert isinstance(i, TAG_Value) - assert i.name - return list(val) - - def __repr__(self): - return "%s( %s ): %s" % (str(self.__class__.__name__), self.name, self.value) - - def pretty_string(self, indent=0): - if self.name: - pretty = " " * indent + "%s( \"%s\" ): %d items\n" % (str(self.__class__.__name__), self.name, len(self.value)) - else: - pretty = " " * indent + "%s(): %d items\n" % (str(self.__class__.__name__), len(self.value)) - indent += 4 - for tag in self.value: - pretty += tag.pretty_string(indent) + "\n" - return pretty - - @classmethod - def load_from(cls, data, data_cursor): - self = cls() - while data_cursor < len(data): - tag_type = data[data_cursor]; - data_cursor += 1 - if(tag_type == 0): - break - - tag, data_cursor = load_named(data, data_cursor, tag_type) - - self._value.append(tag); - - return self, data_cursor - - def __init__(self, value=[], name=""): - - self.name = name; - if value.__class__ == ''.__class__: - self.name = value; - value = []; - self.value = value; - - - - def write_value(self, buf): - for i in self.value: - i.save(buf=buf) - buf.write("\x00") - - "collection functions" - def __getitem__(self, k): - #hits=filter(lambda x:x.name==k, self.value); - #if(len(hits)): return hits[0]; - for key in self.value: - if key.name == k: return key - raise KeyError("Key {0} not found in tag {1}".format(k, self)); - - def __iter__(self): return itertools.imap(lambda x:x.name, self.value); - def __contains__(self, k):return k in map(lambda x:x.name, self.value); - def __len__(self): return self.value.__len__() - - - def __setitem__(self, k, v): - """Automatically wraps lists and tuples in a TAG_List, and wraps strings - and unicodes in a TAG_String.""" - if isinstance(v, (list, tuple)): - v = TAG_List(v) - elif isinstance(v, basestring): - v = TAG_String(v) - - if not (v.__class__ in tag_classes.values()): raise TypeError("Invalid type %s for TAG_Compound" % (v.__class__)) - """remove any items already named "k". """ - olditems = filter(lambda x:x.name == k, self.value) - for i in olditems: self.value.remove(i) - self.value.append(v); - v.name = k; - - def __delitem__(self, k): self.value.__delitem__(self.value.index(self[k])); - - def add(self, v): - self[v.name] = v; - -class TAG_List(TAG_Value, collections.MutableSequence): - - """A homogenous list of unnamed data of a single TAG_* type. - Once created, the type can only be changed by emptying the list - and adding an element of the new type. If created with no arguments, - returns a list of TAG_Compound - - Empty lists in the wild have been seen with type TAG_Byte""" - - tag = 9; - - def dataType(self, val): - if val: - listType = val[0].__class__ - # FIXME: This is kinda weird; None as the empty tag name? - assert all(isinstance(x, listType) and x.name in ("", "None") for x in val) - return list(val) - - def __repr__(self): - return "%s( %s ): %s" % (self.__class__.__name__, self.name, self.value) - - - def pretty_string(self, indent=0): - if self.name: - pretty = " " * indent + "%s( \"%s\" ):\n" % (str(self.__class__.__name__), self.name) - else: - pretty = " " * indent + "%s():\n" % (str(self.__class__.__name__),) - - indent += 4 - for tag in self.value: - pretty += tag.pretty_string(indent) + "\n" - return pretty - - @classmethod - def load_from(cls, data, data_cursor): - self = cls() - self.list_type = data[data_cursor]; - - data_cursor += 1; - - list_length, data_cursor = TAG_Int.load_from(data, data_cursor) - list_length = list_length.value - - - for i in range(list_length): - - tag, data_cursor = tag_classes[self.list_type].load_from(data, data_cursor) - self.append(tag); - - return self, data_cursor - - def __init__(self, value=[], name=None, list_type=TAG_Compound): - #can be created from a list of tags in value, with an optional - #name, or created from raw tag data, or created with list_type - #taken from a TAG class or instance - - self.name = name - self.list_type = list_type.tag - - if(len(value)): - self.list_type = value[0].tag; - value = filter(lambda x:x.__class__ == value[0].__class__, value) - - self.value = value - - - """ collection methods """ - def __iter__(self): return iter(self.value) - def __contains__(self, k):return k in self.value; - def __getitem__(self, i): return self.value[i]; - def __len__(self): return len(self.value) - - def __setitem__(self, i, v): - if v.__class__ != tag_classes[self.list_type]: - raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type])) - v.name = "" - self.value[i] = v; - - def __delitem__(self, i): - del self.value[i] - - def insert(self, i, v): - if not v.tag in tag_classes: raise TypeError("Not a tag type: %s" % (v,)) - if len(self) == 0: - self.list_type = v.tag - else: - if v.__class__ != tag_classes[self.list_type]: raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type])) - - v.name = "" - self.value.insert(i, v); - - def write_value(self, buf): - buf.write(struct.pack(TAGfmt, self.list_type)) - TAG_Int(len(self)).write_value(buf) - for i in self.value: - i.write_value(buf) - - -tag_classes = { - 1 : TAG_Byte, - 2 : TAG_Short, - 3 : TAG_Int, - 4 : TAG_Long, - 5 : TAG_Float, - 6 : TAG_Double, - 7 : TAG_Byte_Array, - 8 : TAG_String, - 9 : TAG_List, - 10: TAG_Compound, - 11: TAG_Int_Array, - 12: TAG_Short_Array, - }; - -import zlib -def gunzip(data): - #strip off the header and use negative WBITS to tell zlib there's no header - return zlib.decompress(data[10:], -zlib.MAX_WBITS) - -def loadFile(filename): - with file(filename, "rb") as f: - inputdata = f.read() - data = inputdata - try: - data = gunzip(inputdata) - except IOError: - print "File %s not zipped" % filename - - return load(buf=fromstring(data, 'uint8')); - -def load_named(data, data_cursor, tag_type): - tag_name, data_cursor = TAG_String.load_from(data, data_cursor) - tag_name = tag_name.value - - tag, data_cursor = tag_classes[tag_type].load_from(data, data_cursor) - tag.name = tag_name - - return tag, data_cursor - -def load(filename="", buf=None): - """Unserialize data from an entire NBT file and return the - root TAG_Compound object. Argument can be a string containing a - filename or an array of integers containing TAG_Compound data. """ - - if filename and isinstance(filename, (str, unicode)): - return loadFile(filename) - if isinstance(buf, str): buf = fromstring(buf, uint8) - data = buf; - #if buf != None: data = buf - if not len(buf): - raise NBTFormatError, "Asked to load root tag of zero length" - - data_cursor = 0; - tag_type = data[data_cursor]; - if tag_type != 10: - raise NBTFormatError, 'Not an NBT file with a root TAG_Compound (found {0})'.format(tag_type); - data_cursor += 1; - - tag, data_cursor = load_named(data, data_cursor, tag_type) - - return tag; - - - -__all__ = [a.__name__ for a in tag_classes.itervalues()] + ["load", "loadFile", "gunzip"] - - +try: + import os + os.environ['DISTUTILS_USE_SDK'] = "1" + from pyximport import install; install() + from _nbt import * +except ImportError, e: + print "Import error loading _nbt extension", repr(e) + import traceback; traceback.print_exc() + from pynbt import * + \ No newline at end of file diff --git a/pynbt.py b/pynbt.py new file mode 100644 index 0000000..ea679a4 --- /dev/null +++ b/pynbt.py @@ -0,0 +1,538 @@ + +# vim:set sw=2 sts=2 ts=2: + +""" +Named Binary Tag library. Serializes and deserializes TAG_* objects +to and from binary data. Load a Minecraft level by calling nbt.load(). +Create your own TAG_* objects and set their values. +Save a TAG_* object to a file or StringIO object. + +Read the test functions at the end of the file to get started. + +This library requires Numpy. Get it here: +http://new.scipy.org/download.html + +Official NBT documentation is here: +http://www.minecraft.net/docs/NBT.txt + + +Copyright 2010 David Rio Vierra +""" +import collections +import itertools +import struct +import gzip +from cStringIO import StringIO; +import os; +from contextlib import closing +from numpy import array, zeros, uint8, fromstring +TAGfmt = ">b" + +class NBTFormatError(RuntimeError): pass + +class TAG_Value(object): + """Simple values. Subclasses override fmt to change the type and size. + Subclasses may set dataType instead of overriding setValue for automatic data type coercion""" + + fmt = ">b"; + tag = -1; #error! + + _value = None + def getValue(self): + return self._value + def setValue(self, newVal): + self._value = self.dataType(newVal) + value = property(getValue, setValue, None, "Change the TAG's value. Data types are checked and coerced if needed.") + + _name = None + def getName(self): + return self._name + def setName(self, newVal): + self._name = str(newVal) + def delName(self): + self._name = "" + name = property(getName, setName, delName, "Change the TAG's name. Coerced to a string.") + + @classmethod + def load_from(cls, data, data_cursor): + data = data[data_cursor:] + (value,) = struct.unpack_from(cls.fmt, data); + self = cls(value=value) + return self, data_cursor + struct.calcsize(self.fmt) + + def __init__(self, value=0, name=None): + self.name = name + self.value = value + + + + def __repr__(self): + return "%s( \"%s\" ): %s" % (str(self.__class__), self.name, repr(self.value)) + + def __str__(self): + return self.pretty_string() + + def pretty_string(self, indent=0): + if self.name: + return " " * indent + "%s( \"%s\" ): %s" % (str(self.__class__.__name__), self.name, self.value) + else: + return " " * indent + "%s: %s" % (str(self.__class__.__name__), self.value) + + + def write_tag(self, buf): + buf.write(struct.pack(TAGfmt, self.tag)) + def write_name(self, buf): + if(self.name != None): + TAG_String(self.name).write_value(buf) + def write_value(self, buf): + buf.write(struct.pack(self.fmt, self.value)) + + def save(self, filename="", buf=None): + if(filename): + self.saveGzipped(filename); + return; + "Save the tagged element to a file." + if self.name == None: self.name = "" #root tag must have name + self.write_tag(buf) + self.write_name(buf) + self.write_value(buf) + + def saveGzipped(self, filename, compresslevel=1): + sio = StringIO(); + #atomic write + try: os.rename(filename, filename + ".old"); + except Exception, e: + #print "Atomic Save: No existing file to rename" + pass + + with closing(gzip.GzipFile(fileobj=sio, mode="wb", compresslevel=compresslevel)) as outputGz: + self.save(buf=outputGz); + outputGz.flush(); + + #print len(sio.getvalue()); + try: + with open(filename, 'wb') as f: + f.write(sio.getvalue()); + except: + try: + os.rename(filename + ".old", filename,); + except Exception, e: + print e; + return + + try: os.remove(filename + ".old"); + except Exception, e: + #print "Atomic Save: No old file to remove" + pass; + +class TAG_Byte(TAG_Value): + tag = 1; + fmt = ">b"; + dataType = int + +class TAG_Short(TAG_Value): + tag = 2; + fmt = ">h"; + dataType = int + +class TAG_Int(TAG_Value): + tag = 3; + fmt = ">i"; + dataType = int + +class TAG_Long(TAG_Value): + tag = 4; + fmt = ">q"; + dataType = long + +class TAG_Float(TAG_Value): + tag = 5; + fmt = ">f"; + dataType = float + + +class TAG_Double(TAG_Value): + tag = 6; + fmt = ">d"; + dataType = float + + +class TAG_Byte_Array(TAG_Value): + """Like a string, but for binary data. four length bytes instead of + two. value is a numpy array, and you can change its elements""" + + tag = 7; + fmt = ">i%ds" + + def dataType(self, value): + return array(value, uint8) + + def __repr__(self): + return "<%s: length %d> ( %s )" % (self.__class__, len(self.value), self.name) + + + def pretty_string(self, indent=0): + if self.name: + return " " * indent + "%s( \"%s\" ): shape=%s dtype=%s %s" % ( + str(self.__class__.__name__), + self.name, + str(self.value.shape), + str(self.value.dtype), + self.value) + else: + return " " * indent + "%s: %s %s" % (str(self.__class__.__name__), str(self.value.shape), self.value) + + @classmethod + def load_from(cls, data, data_cursor): + data = data[data_cursor:] + (string_len,) = struct.unpack_from(">I", data); + value = fromstring(data[4:string_len + 4], 'uint8'); + self = cls(value) + return self, data_cursor + string_len + 4 + + def __init__(self, value=zeros(0, uint8), name=None): + if name: + self.name = name + self.value = value; + + + def write_value(self, buf): + #print self.value + valuestr = self.value.tostring() + buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr), valuestr)) + +class TAG_Int_Array(TAG_Byte_Array): + """An array of ints""" + tag = 11; + def dataType(self, value): + return array(value, '>u4') + + @classmethod + def load_from(cls, data, data_cursor): + data = data[data_cursor:] + (string_len,) = struct.unpack_from(">I", data); + value = fromstring(data[4:string_len * 4 + 4], '>u4') + self = cls(value) + return self, data_cursor + len(self.value) * 4 + 4; + + def __init__(self, value=zeros(0, ">u4"), name=None): + self.name = name + self.value = value; + + + def write_value(self, buf): + #print self.value + valuestr = self.value.tostring() + buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 4, valuestr)) + +class TAG_Short_Array(TAG_Int_Array): + """An array of ints""" + tag = 12; + def dataType(self, value): + return array(value, '>u2') + + @classmethod + def load_from(cls, data, data_cursor): + data = data[data_cursor:] + (string_len,) = struct.unpack_from(">I", data); + value = fromstring(data[4:string_len * 2 + 4], '>u2') + self = cls(value) + return self, data_cursor + len(self.value) * 2 + 4; + + def __init__(self, value=zeros(0, ">u2"), name=None): + self.name = name + self.value = value; + + + def write_value(self, buf): + #print self.value + valuestr = self.value.tostring() + buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 2, valuestr)) + +class TAG_String(TAG_Value): + """String in UTF-8 + The value parameter must be a 'unicode' or a UTF-8 encoded 'str' + """ + + tag = 8; + fmt = ">h%ds" + dataType = lambda self, s: isinstance(s, unicode) and s.encode('utf-8') or s + + @classmethod + def load_from(cls, data, data_cursor): + data = data[data_cursor:] + (string_len,) = struct.unpack_from(">H", data); + value = data[2:string_len + 2].tostring(); + self = cls(value) + return self, data_cursor + string_len + 2; + + def __init__(self, value="", name=None): + if name: + self.name = name + self.value = value + + def write_value(self, buf): + u8value = self._value + buf.write(struct.pack(self.fmt % (len(u8value),), len(u8value), u8value)) + + @property + def unicodeValue(self): + return self.value.decode('utf-8') + + + +class TAG_Compound(TAG_Value, collections.MutableMapping): + """A heterogenous list of named tags. Names must be unique within + the TAG_Compound. Add tags to the compound using the subscript + operator []. This will automatically name the tags.""" + + tag = 10; + + def dataType(self, val): + for i in val: + assert isinstance(i, TAG_Value) + assert i.name + return list(val) + + def __repr__(self): + return "%s( %s ): %s" % (str(self.__class__.__name__), self.name, self.value) + + def pretty_string(self, indent=0): + if self.name: + pretty = " " * indent + "%s( \"%s\" ): %d items\n" % (str(self.__class__.__name__), self.name, len(self.value)) + else: + pretty = " " * indent + "%s(): %d items\n" % (str(self.__class__.__name__), len(self.value)) + indent += 4 + for tag in self.value: + pretty += tag.pretty_string(indent) + "\n" + return pretty + + @classmethod + def load_from(cls, data, data_cursor): + self = cls() + while data_cursor < len(data): + tag_type = data[data_cursor]; + data_cursor += 1 + if(tag_type == 0): + break + + tag, data_cursor = load_named(data, data_cursor, tag_type) + + self._value.append(tag); + + return self, data_cursor + + def __init__(self, value=[], name=""): + + self.name = name; + if value.__class__ == ''.__class__: + self.name = value; + value = []; + self.value = value; + + + + def write_value(self, buf): + for i in self.value: + i.save(buf=buf) + buf.write("\x00") + + "collection functions" + def __getitem__(self, k): + #hits=filter(lambda x:x.name==k, self.value); + #if(len(hits)): return hits[0]; + for key in self.value: + if key.name == k: return key + raise KeyError("Key {0} not found in tag {1}".format(k, self)); + + def __iter__(self): return itertools.imap(lambda x:x.name, self.value); + def __contains__(self, k):return k in map(lambda x:x.name, self.value); + def __len__(self): return self.value.__len__() + + + def __setitem__(self, k, v): + """Automatically wraps lists and tuples in a TAG_List, and wraps strings + and unicodes in a TAG_String.""" + if isinstance(v, (list, tuple)): + v = TAG_List(v) + elif isinstance(v, basestring): + v = TAG_String(v) + + if not (v.__class__ in tag_classes.values()): raise TypeError("Invalid type %s for TAG_Compound" % (v.__class__)) + """remove any items already named "k". """ + olditems = filter(lambda x:x.name == k, self.value) + for i in olditems: self.value.remove(i) + self.value.append(v); + v.name = k; + + def __delitem__(self, k): self.value.__delitem__(self.value.index(self[k])); + + def add(self, v): + self[v.name] = v; + +class TAG_List(TAG_Value, collections.MutableSequence): + + """A homogenous list of unnamed data of a single TAG_* type. + Once created, the type can only be changed by emptying the list + and adding an element of the new type. If created with no arguments, + returns a list of TAG_Compound + + Empty lists in the wild have been seen with type TAG_Byte""" + + tag = 9; + + def dataType(self, val): + if val: + listType = val[0].__class__ + # FIXME: This is kinda weird; None as the empty tag name? + assert all(isinstance(x, listType) and x.name in ("", "None") for x in val) + return list(val) + + def __repr__(self): + return "%s( %s ): %s" % (self.__class__.__name__, self.name, self.value) + + + def pretty_string(self, indent=0): + if self.name: + pretty = " " * indent + "%s( \"%s\" ):\n" % (str(self.__class__.__name__), self.name) + else: + pretty = " " * indent + "%s():\n" % (str(self.__class__.__name__),) + + indent += 4 + for tag in self.value: + pretty += tag.pretty_string(indent) + "\n" + return pretty + + @classmethod + def load_from(cls, data, data_cursor): + self = cls() + self.list_type = data[data_cursor]; + + data_cursor += 1; + + list_length, data_cursor = TAG_Int.load_from(data, data_cursor) + list_length = list_length.value + + + for i in range(list_length): + + tag, data_cursor = tag_classes[self.list_type].load_from(data, data_cursor) + self.append(tag); + + return self, data_cursor + + def __init__(self, value=[], name=None, list_type=TAG_Compound): + #can be created from a list of tags in value, with an optional + #name, or created from raw tag data, or created with list_type + #taken from a TAG class or instance + + self.name = name + self.list_type = list_type.tag + + if(len(value)): + self.list_type = value[0].tag; + value = filter(lambda x:x.__class__ == value[0].__class__, value) + + self.value = value + + + """ collection methods """ + def __iter__(self): return iter(self.value) + def __contains__(self, k):return k in self.value; + def __getitem__(self, i): return self.value[i]; + def __len__(self): return len(self.value) + + def __setitem__(self, i, v): + if v.__class__ != tag_classes[self.list_type]: + raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type])) + v.name = "" + self.value[i] = v; + + def __delitem__(self, i): + del self.value[i] + + def insert(self, i, v): + if not v.tag in tag_classes: raise TypeError("Not a tag type: %s" % (v,)) + if len(self) == 0: + self.list_type = v.tag + else: + if v.__class__ != tag_classes[self.list_type]: raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type])) + + v.name = "" + self.value.insert(i, v); + + def write_value(self, buf): + buf.write(struct.pack(TAGfmt, self.list_type)) + TAG_Int(len(self)).write_value(buf) + for i in self.value: + i.write_value(buf) + + +tag_classes = { + 1 : TAG_Byte, + 2 : TAG_Short, + 3 : TAG_Int, + 4 : TAG_Long, + 5 : TAG_Float, + 6 : TAG_Double, + 7 : TAG_Byte_Array, + 8 : TAG_String, + 9 : TAG_List, + 10: TAG_Compound, + 11: TAG_Int_Array, + 12: TAG_Short_Array, + }; + +import zlib +def gunzip(data): + #strip off the header and use negative WBITS to tell zlib there's no header + return zlib.decompress(data[10:], -zlib.MAX_WBITS) + +def loadFile(filename): + with file(filename, "rb") as f: + inputdata = f.read() + data = inputdata + try: + data = gunzip(inputdata) + except IOError: + print "File %s not zipped" % filename + + return load(buf=fromstring(data, 'uint8')); + +def load_named(data, data_cursor, tag_type): + tag_name, data_cursor = TAG_String.load_from(data, data_cursor) + tag_name = tag_name.value + + tag, data_cursor = tag_classes[tag_type].load_from(data, data_cursor) + tag.name = tag_name + + return tag, data_cursor + +def load(filename="", buf=None): + """Unserialize data from an entire NBT file and return the + root TAG_Compound object. Argument can be a string containing a + filename or an array of integers containing TAG_Compound data. """ + + if filename and isinstance(filename, (str, unicode)): + return loadFile(filename) + if isinstance(buf, str): buf = fromstring(buf, uint8) + data = buf; + #if buf != None: data = buf + if not len(buf): + raise NBTFormatError, "Asked to load root tag of zero length" + + data_cursor = 0; + tag_type = data[data_cursor]; + if tag_type != 10: + raise NBTFormatError, 'Not an NBT file with a root TAG_Compound (found {0})'.format(tag_type); + data_cursor += 1; + + tag, data_cursor = load_named(data, data_cursor, tag_type) + + return tag; + + + +__all__ = [a.__name__ for a in tag_classes.itervalues()] + ["load", "loadFile", "gunzip"] + + diff --git a/schematic.py b/schematic.py index ed42812..62755f2 100644 --- a/schematic.py +++ b/schematic.py @@ -40,12 +40,8 @@ class MCSchematic (EntityLevel): if filename: self.filename = filename - if None is root_tag: - try: - root_tag = nbt.load(filename) - except IOError, e: - error(u"Failed to load file {0}".format (e)) - + if None is root_tag and os.path.exists(filename): + root_tag = nbt.load(filename) else: self.filename = None @@ -120,16 +116,16 @@ class MCSchematic (EntityLevel): try: - self.root_tag = nbt.load(buf=fromstring(data, dtype='uint8')); + self.root_tag = nbt.load(buf=data); except Exception, e: error(u"Malformed NBT data in schematic file: {0} ({1})".format(self.filename, e)) - raise ChunkMalformed, self.filename + raise ChunkMalformed, (e,self.filename), sys.exc_info()[2] try: self.shapeChunkData() except KeyError, e: error(u"Incorrect schematic format in file: {0} ({1})".format(self.filename, e)) - raise ChunkMalformed, self.filename + raise ChunkMalformed, (e,self.filename), sys.exc_info()[2] pass self.dataIsPacked = True; diff --git a/tests.py b/tests.py index dec3831..c2c811b 100644 --- a/tests.py +++ b/tests.py @@ -6,6 +6,7 @@ Created on Jul 23, 2011 #from mclevel import fromFile, loadWorldNumber, BoundingBox #from infiniteworld import MCInfdevOldLevel #from schematic import MCSchematic +#import errorreporting # annotate tracebacks with call arguments try: from pymclevel import * except ImportError: @@ -23,7 +24,7 @@ import time import numpy from numpy import * -#from pymclevel.infiniteworld import MCServerChunkGenerator +from infiniteworld import MCServerChunkGenerator log = logging.getLogger(__name__) warn, error, info, debug = log.warn, log.error, log.info, log.debug @@ -169,7 +170,7 @@ class TestNBT(unittest.TestCase): n = newlevel["Map"]["Spawn"][0].name if(n): print "Named list element failed: %s" % n; - + """ attempt to delete non-existent TAG_Compound elements this generates a KeyError like a python dict does. @@ -186,9 +187,11 @@ class TestNBT(unittest.TestCase): d = join("testfiles", "TileTicks_chunks") files = [join(d, f) for f in os.listdir(d)] startTime = time.time() - for f in files[:20]: - n = nbt.load(f) + for i in range(20): + for f in files[:40]: + n = nbt.load(f) print "Duration: ", time.time() - startTime + #print "NBT: ", n class TestIndevLevel(unittest.TestCase): def setUp(self): @@ -317,7 +320,16 @@ class TestAlphaLevel(unittest.TestCase): level.generateLights(); level.saveInPlace(); - + + def testRecompress(self): + cx,cz = -3, -1 + level = self.alphalevel.level + ch = level.getChunk(cx,cz) + ch.dirty = True + level.saveInPlace() + ch.Blocks + print ch.root_tag + def testPlayerSpawn(self): level = self.alphalevel.level