Added _nbt.pyx, a Cython implentation of nbt that uses extension classes and pointer arithmetic to speed up loading.

Renamed nbt.py to pynbt.py, added nbt.py as a thin loader for either _nbt.pyx or pynbt.py.
This commit is contained in:
David Vierra 2011-11-25 15:56:11 -10:00
parent c48adf501a
commit 4135f603c5
7 changed files with 1211 additions and 571 deletions

632
_nbt.pyx Normal file
View File

@ -0,0 +1,632 @@
# cython: profile=True
# vim:set sw=2 sts=2 ts=2:
"""
Cython implementation
Named Binary Tag library. Serializes and deserializes TAG_* objects
to and from binary data. Load a Minecraft level by calling nbt.load().
Create your own TAG_* objects and set their values.
Save a TAG_* object to a file or StringIO object.
Read the test functions at the end of the file to get started.
This library requires Numpy. Get it here:
http://new.scipy.org/download.html
Official NBT documentation is here:
http://www.minecraft.net/docs/NBT.txt
Copyright 2010 David Rio Vierra
"""
import collections
import itertools
import struct
import gzip
from cStringIO import StringIO
from cpython cimport PyTypeObject, PyObject_TypeCheck, PyUnicode_DecodeUTF8, PyList_Append
cdef extern from "cStringIO.h":
struct PycStringIO_CAPI:
int cwrite (object o, char *buf, Py_ssize_t len)
PyTypeObject * OutputType
cdef extern from "cobject.h":
void * PyCObject_Import(char *module_name, char *cobject_name)
cdef PycStringIO_CAPI *PycStringIO = <PycStringIO_CAPI *>PyCObject_Import("cStringIO", "cStringIO_CAPI")
cdef PyTypeObject * StringO = PycStringIO.OutputType
cdef cwrite(obj, char * buf, size_t len):
#print "cwrite %s %s %d" % (map(ord, buf[:min(4, len)]), buf[:min(4, len)].decode('ascii', 'replace'), len)
return PycStringIO.cwrite(obj, buf, len)
import sys
import os;
from os.path import exists
from contextlib import closing
from numpy import array, zeros, uint8, fromstring, ndarray, frombuffer
cimport numpy as np
cdef char TAG_END = 0
cdef char TAG_BYTE = 1
cdef char TAG_SHORT = 2
cdef char TAG_INT = 3
cdef char TAG_LONG = 4
cdef char TAG_FLOAT = 5
cdef char TAG_DOUBLE = 6
cdef char TAG_BYTE_ARRAY = 7
cdef char TAG_STRING = 8
cdef char TAG_LIST = 9
cdef char TAG_COMPOUND = 10
cdef char TAG_INT_ARRAY = 11
cdef char TAG_SHORT_ARRAY = 12
class NBTFormatError (ValueError):
pass
cdef class TAG_Value:
cdef unicode _name
cdef public char tagID
def __str__(self):
return self.tostr()
cdef tostr(self):
return str(self.__class__) + ": " + str(self.value)
property name:
def __get__(self):
return self._name
def __set__(self, val):
if isinstance(val, str): val = PyUnicode_DecodeUTF8(val, len(val), "strict")
self._name = val
def __reduce__(self):
return (self.__class__, (self.value, self._name))
cdef class TAG_Number(TAG_Value):
pass
cdef class TAG_Array(TAG_Value):
pass
cdef class TAG_Byte(TAG_Number):
cdef public char value
cdef save_value(self, buf):
save_byte(self.value, buf)
def __init__(self, char value=0, name = u""):
self.value = value
self.name = name
self.tagID = TAG_BYTE
cdef class TAG_Short(TAG_Number):
cdef public short value
cdef save_value(self, buf):
save_short(self.value, buf)
def __init__(self, short value=0, name = u""):
self.value = value
self.name = name
self.tagID = TAG_SHORT
cdef class TAG_Int(TAG_Number):
cdef public int value
cdef save_value(self, buf):
save_int(self.value, buf)
def __init__(self, int value=0, name = u""):
self.value = value
self.name = name
self.tagID = TAG_INT
cdef class TAG_Long(TAG_Number):
cdef public long long value
cdef save_value(self, buf):
save_long(self.value, buf)
def __init__(self, long long value=0, name = u""):
self.value = value
self.name = name
self.tagID = TAG_LONG
cdef class TAG_Float(TAG_Number):
cdef public float value
cdef save_value(self, buf):
save_float(self.value, buf)
def __init__(self, float value=0., name = u""):
self.value = value
self.name = name
self.tagID = TAG_FLOAT
cdef class TAG_Double(TAG_Number):
cdef public double value
cdef save_value(self, buf):
save_double(self.value, buf)
def __init__(self, double value=0., name = u""):
self.value = value
self.name = name
self.tagID = TAG_DOUBLE
cdef class TAG_Byte_Array(TAG_Array):
cdef public object value
def __init__(self, value = zeros((0,), 'uint8'), name = u""):
self.value = value
self.name = name
self.tagID = TAG_BYTE_ARRAY
cdef save_value(self, buf):
save_byte_array(self.value, buf)
cdef class TAG_String(TAG_Value):
cdef unicode _value
def __init__(self, value = u"", name = u""):
if isinstance(value, str): value = PyUnicode_DecodeUTF8(value, len(value), "strict")
self.value = value
self.name = name
self.tagID = TAG_STRING
property value:
def __get__(self):
return self._value
def __set__(self, value):
if isinstance(value, str): value = PyUnicode_DecodeUTF8(value, len(value), "strict")
self._value = value
cdef save_value(self, buf):
save_string(self.value.encode('utf-8'), buf)
cdef class _TAG_List(TAG_Value):
cdef public list value
def __init__(self, value = None, name = u""):
self.value = list(value or [])
self.name = name
self.tagID = TAG_LIST
"""collection methods"""
def __getitem__(self, key):
return self.value[key]
def __setitem__(self, key, val):
self.value[key] = val
def __iter__(self):
return iter(self.value)
def __len__(self): return len(self.value)
def insert(self, idx, key):
self.value.insert(idx, key)
property list_type:
def __get__(self):
if len(self.value): return self.value[0].tagID
return TAG_BYTE
cdef save_value(self, buf):
save_tag_id(self.list_type, buf)
save_int(len(self.value), buf)
items = self.value
for subtag in items:
if subtag.tagID != self.list_type:
raise NBTFormatError, "Asked to save TAG_List with different types! Found %s and %s" % (subtag.tagID, self.list_type)
save_tag_value(subtag, buf)
class TAG_List(_TAG_List, collections.MutableSequence):
pass
cdef class _TAG_Compound(TAG_Value):
cdef public dict value
def __init__(self, value = None, name = u""):
self.value = value or {}
self.name = name
self.tagID = TAG_COMPOUND
"""collection methods"""
def __getitem__(self, key):
return self.value[key]
def __setitem__(self, key, val):
assert isinstance(val, TAG_Value)
val.name = key
self.value[key] = val
def __delitem__(self, key):
del self.value[key]
def __iter__(self): return iter(self.value)
def __contains__(self, k):return k in self.value
def __len__(self): return len(self.value)
def __str__(self):
return str(self.__class__) + ": " + str(self.value)
__repr__ = __str__
def add(self, tag):
assert tag.name
self[tag.name] = tag
cdef save_value(self, buf):
i = self.iteritems()
for name, subtag in i:
#print "save_tag_name", name, subtag.tagID, "Named", subtag.name,
save_tag_id(subtag.tagID, buf)
#print "id",
save_tag_name(subtag, buf)
#print "name",
save_tag_value(subtag, buf)
#print "value", name
save_tag_id(TAG_END, buf)
class TAG_Compound(_TAG_Compound, collections.MutableMapping):
def __init__(self, value = None, name = u""):
_TAG_Compound.__init__(self, value, name)
def save(self, filename = "", buf = None):
save_root_tag(self, filename, buf)
cdef class TAG_Int_Array(TAG_Array):
cdef char _tagID(self): return TAG_INT_ARRAY
cdef public object value
cdef save_value(self, buf):
pass
cdef class TAG_Short_Array(TAG_Array):
cdef char _tagID(self): return TAG_SHORT_ARRAY
cdef public object value
cdef save_value(self, buf):
pass
#cdef int needswap = (sys.byteorder == "little")
cdef swab(void * vbuf, int nbytes):
cdef unsigned char * buf = <unsigned char *>(vbuf)
#print "Swapping ", nbytes, "bytes"
#for i in range(nbytes): print buf[i],
#print "to",
#if not needswap: return
cdef int i
for i in range((nbytes+1)/2):
buf[i], buf[nbytes-i-1] = buf[nbytes-i-1], buf[i]
#for i in range(nbytes): print buf[i],
import zlib
def gunzip(data):
#strip off the header and use negative WBITS to tell zlib there's no header
return zlib.decompress(data[10:], -zlib.MAX_WBITS)
def try_gunzip(data):
try:
data = gunzip(data)
except Exception, e:
pass
return data
def load(buf=None, filename=None):
try:
if isinstance(buf, basestring) and exists(buf):
filename = buf
except TypeError:
pass
if filename and exists(filename):
data = file(filename, "rb").read()
data = try_gunzip(data)
return load_buffer(data)
return load_buffer(try_gunzip(buf))
cdef class load_ctx:
cdef unsigned long offset
cdef char * buffer
cdef unsigned long size
cdef int require(self, int s) except -1:
#print "Asked for ", s
if s > self.size - self.offset:
raise NBTFormatError, "NBT Stream too short. Asked for %d, only had %d" % (s, (self.size - self.offset))
return 0
should_dump = False
cdef load_buffer(bytes buf):
cdef load_ctx ctx = load_ctx()
ctx.offset = 1
ctx.buffer = buf
ctx.size = len(buf)
if len(buf) < 1:
raise NBTFormatError, "NBT Stream too short!"
if should_dump: print dump(buf)
assert ctx.buffer[0] == TAG_COMPOUND, "Data is not a TAG_Compound (found %d)" % ctx.buffer[0]
name = load_string(ctx)
#print "Root name", name
tag = load_compound(ctx)
tag.name = name
return tag
cdef load_byte(load_ctx ctx):
ctx.require(1)
cdef char * ptr = <char *>(ctx.buffer + ctx.offset)
ctx.offset += 1
cdef TAG_Byte tag = TAG_Byte.__new__(TAG_Byte)
tag.value = ptr[0]
tag.tagID = TAG_BYTE
tag._name = u""
return tag
cdef load_short(load_ctx ctx):
ctx.require(2)
cdef short * ptr = <short *>(ctx.buffer + ctx.offset)
swab(ptr, 2)
ctx.offset += 2
cdef TAG_Short tag = TAG_Short.__new__(TAG_Short)
tag.value = ptr[0]
tag.tagID = TAG_SHORT
tag._name = u""
return tag
cdef load_int(load_ctx ctx):
ctx.require(4)
cdef int * ptr = <int *>(ctx.buffer + ctx.offset)
swab(ptr, 4)
ctx.offset += 4
cdef TAG_Int tag = TAG_Int.__new__(TAG_Int)
tag.value = (ptr[0])
tag.tagID = TAG_INT
tag._name = u""
return tag
cdef load_long(load_ctx ctx):
ctx.require(8)
cdef long long * ptr = <long long *>(ctx.buffer + ctx.offset)
swab(ptr, 8)
ctx.offset += 8
cdef TAG_Long tag = TAG_Long.__new__(TAG_Long)
tag.value = ptr[0]
tag.tagID = TAG_LONG
tag._name = u""
return tag
cdef load_float(load_ctx ctx):
ctx.require(4)
cdef float * ptr = <float *>(ctx.buffer + ctx.offset)
swab(ptr, 4)
ctx.offset += 4
cdef TAG_Float tag = TAG_Float.__new__(TAG_Float)
tag.value = ptr[0]
tag.tagID = TAG_FLOAT
tag._name = u""
return tag
cdef load_double(load_ctx ctx):
ctx.require(8)
cdef double * ptr = <double *>(ctx.buffer + ctx.offset)
swab(ptr, 8)
ctx.offset += 8
cdef TAG_Double tag = TAG_Double.__new__(TAG_Double)
tag.value = ptr[0]
tag.tagID = TAG_DOUBLE
tag._name = u""
return tag
cdef load_bytearray(load_ctx ctx):
ctx.require(4)
cdef unsigned int * ptr = <unsigned int *>(ctx.buffer + ctx.offset)
swab(ptr, 4)
cdef unsigned int length = ptr[0]
ctx.offset += 4
cdef char * arr = ctx.buffer + ctx.offset
#print "Bytearray", length, ctx.size - ctx.offset
ctx.require(length)
ctx.offset += length
return TAG_Byte_Array(fromstring(arr[:length], dtype='uint8', count=length))
### --- load_compound ---
cdef load_compound(load_ctx ctx):
#print "load_compound buf=%d off=%d" % (ctx.buffer[0], ctx.offset)
cdef char tagID
cdef _TAG_Compound root_tag = TAG_Compound()
assert root_tag is not None
while True:
ctx.require(1)
tagID = ctx.buffer[ctx.offset]
ctx.offset += 1
if tagID == TAG_END:
#print "TAG_END at ", ctx.offset
break;
else:
name = load_string(ctx)
tag = load_tag(tagID, ctx)
#tag.name = name
#print "tagID=%d name=%s at %d" % (tagID, tag.name, ctx.offset)
root_tag[name] = tag
return root_tag
cdef load_list(load_ctx ctx):
ctx.require(5)
cdef char tagID = ctx.buffer[ctx.offset]
ctx.offset += 1
cdef int * ptr = <int *>(ctx.buffer + ctx.offset)
swab(ptr, 4)
ctx.offset += 4
length = ptr[0]
cdef _TAG_List tag = TAG_List()
cdef list val = tag.value
cdef int i
for i in range(length):
PyList_Append(val, load_tag(tagID, ctx))
return tag
cdef unicode load_string(load_ctx ctx):
ctx.require(2)
cdef unsigned short * ptr = <unsigned short *>(ctx.buffer+ctx.offset)
swab(ptr, 2)
ctx.offset += 2
cdef unsigned short length = ptr[0]
#print "String: ", ctx.offset, length
cdef unicode u = PyUnicode_DecodeUTF8(ctx.buffer + ctx.offset, length, "strict")
ctx.offset += length
return u
cdef load_tag(char tagID, load_ctx ctx):
if tagID == TAG_BYTE:
return load_byte(ctx)
if tagID == TAG_SHORT:
return load_short(ctx)
if tagID == TAG_INT:
return load_int(ctx)
if tagID == TAG_LONG:
return load_long(ctx)
if tagID == TAG_FLOAT:
return load_float(ctx)
if tagID == TAG_DOUBLE:
return load_double(ctx)
if tagID == TAG_BYTE_ARRAY:
return load_bytearray(ctx)
if tagID == TAG_STRING:
u = load_string(ctx)
return TAG_String(u)
if tagID == TAG_LIST:
return load_list(ctx)
if tagID == TAG_COMPOUND:
return load_compound(ctx)
if tagID == TAG_INT_ARRAY:
pass
if tagID == TAG_SHORT_ARRAY:
pass
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
def dump(src, length=8):
N=0; result=''
while src:
s,src = src[:length],src[length:]
hexa = ' '.join(["%02X"%ord(x) for x in s])
s = s.translate(FILTER)
result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
N+=length
return result
cdef save_root_tag(tag, filename = "", buf = None):
sio = StringIO()
save_tag(tag, sio)
data = sio.getvalue()
if buf is None:
f = file(filename, "wb")
gzio = StringIO()
gz = gzip.GzipFile(fileobj=gzio, mode='wb', compresslevel=2)
gz.write(data)
gz.close()
f.write(gzio.getvalue())
else:
buf.write(data)
cdef save_tag(TAG_Value tag, object buf):
save_tag_id(tag.tagID, buf)
save_tag_name(tag, buf)
save_tag_value(tag, buf)
cdef save_tag_id(char tagID, object buf):
cwrite(buf, &tagID, 1)
cdef save_tag_name(TAG_Value tag, object buf):
name = tag.name.encode('utf-8')
save_string(name, buf)
cdef save_string(bytes value, object buf):
cdef unsigned short length = len(value)
cdef char * s = value
swab(&length, 2)
cwrite(buf, <char *>&length, 2)
cwrite(buf, s, len(value))
cdef save_byte_array(object value, object buf):
value = value.tostring()
cdef char * s = value
cdef unsigned int length = len(value)
swab(&length, 4)
cwrite(buf, <char *>&length, 4)
cwrite(buf, s, len(value))
cdef save_byte(char value, object buf):
cwrite(buf, <char *>&value, 1)
cdef save_short(short value, object buf):
swab(&value, 2)
cwrite(buf, <char *>&value, 2)
cdef save_int(int value, object buf):
swab(&value, 4)
cwrite(buf, <char *>&value, 4)
cdef save_long(long long value, object buf):
# print "Long long value: ", value, sizeof(value)
swab(&value, 8)
cdef char * p = <char *>&value
cwrite(buf, p, 8)
#cwrite(buf, p+4, 4)
#cwrite(buf, "\0\0\0\0\0\0\0\0", 8)
cdef save_float(float value, object buf):
swab(&value, 4)
cwrite(buf, <char *>&value, 4)
cdef save_double(double value, object buf):
swab(&value, 8)
cwrite(buf, <char *>&value, 8)
cdef save_tag_value(TAG_Value tag, object buf):
cdef char tagID = tag.tagID
if tagID == TAG_BYTE:
(<TAG_Byte>tag).save_value(buf)
if tagID == TAG_SHORT:
(<TAG_Short>tag).save_value(buf)
if tagID == TAG_INT:
(<TAG_Int>tag).save_value(buf)
if tagID == TAG_LONG:
(<TAG_Long>tag).save_value(buf)
if tagID == TAG_FLOAT:
(<TAG_Float>tag).save_value(buf)
if tagID == TAG_DOUBLE:
(<TAG_Double>tag).save_value(buf)
if tagID == TAG_BYTE_ARRAY:
(<TAG_Byte_Array>tag).save_value(buf)
if tagID == TAG_STRING:
(<TAG_String>tag).save_value(buf)
if tagID == TAG_LIST:
(<_TAG_List>tag).save_value(buf)
if tagID == TAG_COMPOUND:
(<_TAG_Compound>tag).save_value(buf)
if tagID == TAG_INT_ARRAY:
pass
if tagID == TAG_SHORT_ARRAY:
pass

View File

@ -62,7 +62,10 @@ class MCJavaLevel(MCLevel):
def __init__(self, filename, data):
self.filename = filename;
if isinstance(data, basestring):
data = fromstring(data, dtype='uint8')
self.filedata = data;
#try to take x,z,y from the filename
r = re.findall("\d+", os.path.basename(filename))
if r and len(r) >= 3:
@ -77,7 +80,7 @@ class MCJavaLevel(MCLevel):
blockCount = h * l * w
if blockCount > data.shape[0]:
raise ValueError, "Level file does not contain enough blocks! Try putting the size into the filename, e.g. server_level_{w}_{l}_{h}.dat".format(w=w, l=l, h=h);
raise ValueError, "Level file does not contain enough blocks! (size {s}) Try putting the size into the filename, e.g. server_level_{w}_{l}_{h}.dat".format(w=w, l=l, h=h, s=data.shape);
blockOffset = data.shape[0] - blockCount

View File

@ -255,8 +255,8 @@ def fromFile(filename, loadInfinite=True):
compressed = False;
unzippedData = rawdata
data = fromstring(unzippedData, dtype='uint8')
#data =
data = unzippedData
if MCJavaLevel._isDataLevel(data):
info(u"Detected compressed Java-style level")
lev = MCJavaLevel(filename, data);
@ -265,15 +265,17 @@ def fromFile(filename, loadInfinite=True):
try:
root_tag = nbt.load(buf=data);
except Exception, e:
info(u"Error during NBT load: {0!r}".format(e))
info(traceback.format_exc())
info(u"Fallback: Detected compressed flat block array, yzx ordered ")
try:
lev = MCJavaLevel(filename, data);
lev.compressed = compressed;
return lev;
except Exception, e2:
raise LoadingError, ("Multiple errors encountered", e, e2)
raise LoadingError, ("Multiple errors encountered", e, e2), sys.exc_info()[2]
else:
if(MCIndevLevel._isTagLevel(root_tag)):
@ -288,8 +290,6 @@ def fromFile(filename, loadInfinite=True):
return INVEditChest(root_tag=root_tag, filename=filename);
#it must be a plain array of blocks. see if MCJavaLevel handles it.
raise IOError, "Cannot detect file type."
@ -301,16 +301,3 @@ def loadWorldNumber(i):
#deprecated
filename = u"{0}{1}{2}{3}{1}".format(saveFileDir, os.sep, u"World", i)
return fromFile(filename)
#
#from formats import *
#from box import BoundingBox
#
####xxxxx CHECK RESULTS
#
##import cProfile
#if __name__=="__main__":
# #cProfile.run('testmain()');
# logging.basicConfig(format=u'%(levelname)s:%(message)s')
# logging.getLogger().level = logging.INFO
#
# testmain();

548
nbt.py
View File

@ -1,538 +1,10 @@
# vim:set sw=2 sts=2 ts=2:
"""
Named Binary Tag library. Serializes and deserializes TAG_* objects
to and from binary data. Load a Minecraft level by calling nbt.load().
Create your own TAG_* objects and set their values.
Save a TAG_* object to a file or StringIO object.
Read the test functions at the end of the file to get started.
This library requires Numpy. Get it here:
http://new.scipy.org/download.html
Official NBT documentation is here:
http://www.minecraft.net/docs/NBT.txt
Copyright 2010 David Rio Vierra
"""
import collections
import itertools
import struct
import gzip
from cStringIO import StringIO;
import os;
from contextlib import closing
from numpy import array, zeros, uint8, fromstring
TAGfmt = ">b"
class NBTFormatError(RuntimeError): pass
class TAG_Value(object):
"""Simple values. Subclasses override fmt to change the type and size.
Subclasses may set dataType instead of overriding setValue for automatic data type coercion"""
fmt = ">b";
tag = -1; #error!
_value = None
def getValue(self):
return self._value
def setValue(self, newVal):
self._value = self.dataType(newVal)
value = property(getValue, setValue, None, "Change the TAG's value. Data types are checked and coerced if needed.")
_name = None
def getName(self):
return self._name
def setName(self, newVal):
self._name = str(newVal)
def delName(self):
self._name = ""
name = property(getName, setName, delName, "Change the TAG's name. Coerced to a string.")
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(value,) = struct.unpack_from(cls.fmt, data);
self = cls(value=value)
return self, data_cursor + struct.calcsize(self.fmt)
def __init__(self, value=0, name=None):
self.name = name
self.value = value
def __repr__(self):
return "%s( \"%s\" ): %s" % (str(self.__class__), self.name, repr(self.value))
def __str__(self):
return self.pretty_string()
def pretty_string(self, indent=0):
if self.name:
return " " * indent + "%s( \"%s\" ): %s" % (str(self.__class__.__name__), self.name, self.value)
else:
return " " * indent + "%s: %s" % (str(self.__class__.__name__), self.value)
def write_tag(self, buf):
buf.write(struct.pack(TAGfmt, self.tag))
def write_name(self, buf):
if(self.name != None):
TAG_String(self.name).write_value(buf)
def write_value(self, buf):
buf.write(struct.pack(self.fmt, self.value))
def save(self, filename="", buf=None):
if(filename):
self.saveGzipped(filename);
return;
"Save the tagged element to a file."
if self.name == None: self.name = "" #root tag must have name
self.write_tag(buf)
self.write_name(buf)
self.write_value(buf)
def saveGzipped(self, filename, compresslevel=1):
sio = StringIO();
#atomic write
try: os.rename(filename, filename + ".old");
except Exception, e:
#print "Atomic Save: No existing file to rename"
pass
with closing(gzip.GzipFile(fileobj=sio, mode="wb", compresslevel=compresslevel)) as outputGz:
self.save(buf=outputGz);
outputGz.flush();
#print len(sio.getvalue());
try:
with open(filename, 'wb') as f:
f.write(sio.getvalue());
except:
try:
os.rename(filename + ".old", filename,);
except Exception, e:
print e;
return
try: os.remove(filename + ".old");
except Exception, e:
#print "Atomic Save: No old file to remove"
pass;
class TAG_Byte(TAG_Value):
tag = 1;
fmt = ">b";
dataType = int
class TAG_Short(TAG_Value):
tag = 2;
fmt = ">h";
dataType = int
class TAG_Int(TAG_Value):
tag = 3;
fmt = ">i";
dataType = int
class TAG_Long(TAG_Value):
tag = 4;
fmt = ">q";
dataType = long
class TAG_Float(TAG_Value):
tag = 5;
fmt = ">f";
dataType = float
class TAG_Double(TAG_Value):
tag = 6;
fmt = ">d";
dataType = float
class TAG_Byte_Array(TAG_Value):
"""Like a string, but for binary data. four length bytes instead of
two. value is a numpy array, and you can change its elements"""
tag = 7;
fmt = ">i%ds"
def dataType(self, value):
return array(value, uint8)
def __repr__(self):
return "<%s: length %d> ( %s )" % (self.__class__, len(self.value), self.name)
def pretty_string(self, indent=0):
if self.name:
return " " * indent + "%s( \"%s\" ): shape=%s dtype=%s %s" % (
str(self.__class__.__name__),
self.name,
str(self.value.shape),
str(self.value.dtype),
self.value)
else:
return " " * indent + "%s: %s %s" % (str(self.__class__.__name__), str(self.value.shape), self.value)
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len + 4], 'uint8');
self = cls(value)
return self, data_cursor + string_len + 4
def __init__(self, value=zeros(0, uint8), name=None):
if name:
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr), valuestr))
class TAG_Int_Array(TAG_Byte_Array):
"""An array of ints"""
tag = 11;
def dataType(self, value):
return array(value, '>u4')
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len * 4 + 4], '>u4')
self = cls(value)
return self, data_cursor + len(self.value) * 4 + 4;
def __init__(self, value=zeros(0, ">u4"), name=None):
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 4, valuestr))
class TAG_Short_Array(TAG_Int_Array):
"""An array of ints"""
tag = 12;
def dataType(self, value):
return array(value, '>u2')
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len * 2 + 4], '>u2')
self = cls(value)
return self, data_cursor + len(self.value) * 2 + 4;
def __init__(self, value=zeros(0, ">u2"), name=None):
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 2, valuestr))
class TAG_String(TAG_Value):
"""String in UTF-8
The value parameter must be a 'unicode' or a UTF-8 encoded 'str'
"""
tag = 8;
fmt = ">h%ds"
dataType = lambda self, s: isinstance(s, unicode) and s.encode('utf-8') or s
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">H", data);
value = data[2:string_len + 2].tostring();
self = cls(value)
return self, data_cursor + string_len + 2;
def __init__(self, value="", name=None):
if name:
self.name = name
self.value = value
def write_value(self, buf):
u8value = self._value
buf.write(struct.pack(self.fmt % (len(u8value),), len(u8value), u8value))
@property
def unicodeValue(self):
return self.value.decode('utf-8')
class TAG_Compound(TAG_Value, collections.MutableMapping):
"""A heterogenous list of named tags. Names must be unique within
the TAG_Compound. Add tags to the compound using the subscript
operator []. This will automatically name the tags."""
tag = 10;
def dataType(self, val):
for i in val:
assert isinstance(i, TAG_Value)
assert i.name
return list(val)
def __repr__(self):
return "%s( %s ): %s" % (str(self.__class__.__name__), self.name, self.value)
def pretty_string(self, indent=0):
if self.name:
pretty = " " * indent + "%s( \"%s\" ): %d items\n" % (str(self.__class__.__name__), self.name, len(self.value))
else:
pretty = " " * indent + "%s(): %d items\n" % (str(self.__class__.__name__), len(self.value))
indent += 4
for tag in self.value:
pretty += tag.pretty_string(indent) + "\n"
return pretty
@classmethod
def load_from(cls, data, data_cursor):
self = cls()
while data_cursor < len(data):
tag_type = data[data_cursor];
data_cursor += 1
if(tag_type == 0):
break
tag, data_cursor = load_named(data, data_cursor, tag_type)
self._value.append(tag);
return self, data_cursor
def __init__(self, value=[], name=""):
self.name = name;
if value.__class__ == ''.__class__:
self.name = value;
value = [];
self.value = value;
def write_value(self, buf):
for i in self.value:
i.save(buf=buf)
buf.write("\x00")
"collection functions"
def __getitem__(self, k):
#hits=filter(lambda x:x.name==k, self.value);
#if(len(hits)): return hits[0];
for key in self.value:
if key.name == k: return key
raise KeyError("Key {0} not found in tag {1}".format(k, self));
def __iter__(self): return itertools.imap(lambda x:x.name, self.value);
def __contains__(self, k):return k in map(lambda x:x.name, self.value);
def __len__(self): return self.value.__len__()
def __setitem__(self, k, v):
"""Automatically wraps lists and tuples in a TAG_List, and wraps strings
and unicodes in a TAG_String."""
if isinstance(v, (list, tuple)):
v = TAG_List(v)
elif isinstance(v, basestring):
v = TAG_String(v)
if not (v.__class__ in tag_classes.values()): raise TypeError("Invalid type %s for TAG_Compound" % (v.__class__))
"""remove any items already named "k". """
olditems = filter(lambda x:x.name == k, self.value)
for i in olditems: self.value.remove(i)
self.value.append(v);
v.name = k;
def __delitem__(self, k): self.value.__delitem__(self.value.index(self[k]));
def add(self, v):
self[v.name] = v;
class TAG_List(TAG_Value, collections.MutableSequence):
"""A homogenous list of unnamed data of a single TAG_* type.
Once created, the type can only be changed by emptying the list
and adding an element of the new type. If created with no arguments,
returns a list of TAG_Compound
Empty lists in the wild have been seen with type TAG_Byte"""
tag = 9;
def dataType(self, val):
if val:
listType = val[0].__class__
# FIXME: This is kinda weird; None as the empty tag name?
assert all(isinstance(x, listType) and x.name in ("", "None") for x in val)
return list(val)
def __repr__(self):
return "%s( %s ): %s" % (self.__class__.__name__, self.name, self.value)
def pretty_string(self, indent=0):
if self.name:
pretty = " " * indent + "%s( \"%s\" ):\n" % (str(self.__class__.__name__), self.name)
else:
pretty = " " * indent + "%s():\n" % (str(self.__class__.__name__),)
indent += 4
for tag in self.value:
pretty += tag.pretty_string(indent) + "\n"
return pretty
@classmethod
def load_from(cls, data, data_cursor):
self = cls()
self.list_type = data[data_cursor];
data_cursor += 1;
list_length, data_cursor = TAG_Int.load_from(data, data_cursor)
list_length = list_length.value
for i in range(list_length):
tag, data_cursor = tag_classes[self.list_type].load_from(data, data_cursor)
self.append(tag);
return self, data_cursor
def __init__(self, value=[], name=None, list_type=TAG_Compound):
#can be created from a list of tags in value, with an optional
#name, or created from raw tag data, or created with list_type
#taken from a TAG class or instance
self.name = name
self.list_type = list_type.tag
if(len(value)):
self.list_type = value[0].tag;
value = filter(lambda x:x.__class__ == value[0].__class__, value)
self.value = value
""" collection methods """
def __iter__(self): return iter(self.value)
def __contains__(self, k):return k in self.value;
def __getitem__(self, i): return self.value[i];
def __len__(self): return len(self.value)
def __setitem__(self, i, v):
if v.__class__ != tag_classes[self.list_type]:
raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type]))
v.name = ""
self.value[i] = v;
def __delitem__(self, i):
del self.value[i]
def insert(self, i, v):
if not v.tag in tag_classes: raise TypeError("Not a tag type: %s" % (v,))
if len(self) == 0:
self.list_type = v.tag
else:
if v.__class__ != tag_classes[self.list_type]: raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type]))
v.name = ""
self.value.insert(i, v);
def write_value(self, buf):
buf.write(struct.pack(TAGfmt, self.list_type))
TAG_Int(len(self)).write_value(buf)
for i in self.value:
i.write_value(buf)
tag_classes = {
1 : TAG_Byte,
2 : TAG_Short,
3 : TAG_Int,
4 : TAG_Long,
5 : TAG_Float,
6 : TAG_Double,
7 : TAG_Byte_Array,
8 : TAG_String,
9 : TAG_List,
10: TAG_Compound,
11: TAG_Int_Array,
12: TAG_Short_Array,
};
import zlib
def gunzip(data):
#strip off the header and use negative WBITS to tell zlib there's no header
return zlib.decompress(data[10:], -zlib.MAX_WBITS)
def loadFile(filename):
with file(filename, "rb") as f:
inputdata = f.read()
data = inputdata
try:
data = gunzip(inputdata)
except IOError:
print "File %s not zipped" % filename
return load(buf=fromstring(data, 'uint8'));
def load_named(data, data_cursor, tag_type):
tag_name, data_cursor = TAG_String.load_from(data, data_cursor)
tag_name = tag_name.value
tag, data_cursor = tag_classes[tag_type].load_from(data, data_cursor)
tag.name = tag_name
return tag, data_cursor
def load(filename="", buf=None):
"""Unserialize data from an entire NBT file and return the
root TAG_Compound object. Argument can be a string containing a
filename or an array of integers containing TAG_Compound data. """
if filename and isinstance(filename, (str, unicode)):
return loadFile(filename)
if isinstance(buf, str): buf = fromstring(buf, uint8)
data = buf;
#if buf != None: data = buf
if not len(buf):
raise NBTFormatError, "Asked to load root tag of zero length"
data_cursor = 0;
tag_type = data[data_cursor];
if tag_type != 10:
raise NBTFormatError, 'Not an NBT file with a root TAG_Compound (found {0})'.format(tag_type);
data_cursor += 1;
tag, data_cursor = load_named(data, data_cursor, tag_type)
return tag;
__all__ = [a.__name__ for a in tag_classes.itervalues()] + ["load", "loadFile", "gunzip"]
try:
import os
os.environ['DISTUTILS_USE_SDK'] = "1"
from pyximport import install; install()
from _nbt import *
except ImportError, e:
print "Import error loading _nbt extension", repr(e)
import traceback; traceback.print_exc()
from pynbt import *

538
pynbt.py Normal file
View File

@ -0,0 +1,538 @@
# vim:set sw=2 sts=2 ts=2:
"""
Named Binary Tag library. Serializes and deserializes TAG_* objects
to and from binary data. Load a Minecraft level by calling nbt.load().
Create your own TAG_* objects and set their values.
Save a TAG_* object to a file or StringIO object.
Read the test functions at the end of the file to get started.
This library requires Numpy. Get it here:
http://new.scipy.org/download.html
Official NBT documentation is here:
http://www.minecraft.net/docs/NBT.txt
Copyright 2010 David Rio Vierra
"""
import collections
import itertools
import struct
import gzip
from cStringIO import StringIO;
import os;
from contextlib import closing
from numpy import array, zeros, uint8, fromstring
TAGfmt = ">b"
class NBTFormatError(RuntimeError): pass
class TAG_Value(object):
"""Simple values. Subclasses override fmt to change the type and size.
Subclasses may set dataType instead of overriding setValue for automatic data type coercion"""
fmt = ">b";
tag = -1; #error!
_value = None
def getValue(self):
return self._value
def setValue(self, newVal):
self._value = self.dataType(newVal)
value = property(getValue, setValue, None, "Change the TAG's value. Data types are checked and coerced if needed.")
_name = None
def getName(self):
return self._name
def setName(self, newVal):
self._name = str(newVal)
def delName(self):
self._name = ""
name = property(getName, setName, delName, "Change the TAG's name. Coerced to a string.")
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(value,) = struct.unpack_from(cls.fmt, data);
self = cls(value=value)
return self, data_cursor + struct.calcsize(self.fmt)
def __init__(self, value=0, name=None):
self.name = name
self.value = value
def __repr__(self):
return "%s( \"%s\" ): %s" % (str(self.__class__), self.name, repr(self.value))
def __str__(self):
return self.pretty_string()
def pretty_string(self, indent=0):
if self.name:
return " " * indent + "%s( \"%s\" ): %s" % (str(self.__class__.__name__), self.name, self.value)
else:
return " " * indent + "%s: %s" % (str(self.__class__.__name__), self.value)
def write_tag(self, buf):
buf.write(struct.pack(TAGfmt, self.tag))
def write_name(self, buf):
if(self.name != None):
TAG_String(self.name).write_value(buf)
def write_value(self, buf):
buf.write(struct.pack(self.fmt, self.value))
def save(self, filename="", buf=None):
if(filename):
self.saveGzipped(filename);
return;
"Save the tagged element to a file."
if self.name == None: self.name = "" #root tag must have name
self.write_tag(buf)
self.write_name(buf)
self.write_value(buf)
def saveGzipped(self, filename, compresslevel=1):
sio = StringIO();
#atomic write
try: os.rename(filename, filename + ".old");
except Exception, e:
#print "Atomic Save: No existing file to rename"
pass
with closing(gzip.GzipFile(fileobj=sio, mode="wb", compresslevel=compresslevel)) as outputGz:
self.save(buf=outputGz);
outputGz.flush();
#print len(sio.getvalue());
try:
with open(filename, 'wb') as f:
f.write(sio.getvalue());
except:
try:
os.rename(filename + ".old", filename,);
except Exception, e:
print e;
return
try: os.remove(filename + ".old");
except Exception, e:
#print "Atomic Save: No old file to remove"
pass;
class TAG_Byte(TAG_Value):
tag = 1;
fmt = ">b";
dataType = int
class TAG_Short(TAG_Value):
tag = 2;
fmt = ">h";
dataType = int
class TAG_Int(TAG_Value):
tag = 3;
fmt = ">i";
dataType = int
class TAG_Long(TAG_Value):
tag = 4;
fmt = ">q";
dataType = long
class TAG_Float(TAG_Value):
tag = 5;
fmt = ">f";
dataType = float
class TAG_Double(TAG_Value):
tag = 6;
fmt = ">d";
dataType = float
class TAG_Byte_Array(TAG_Value):
"""Like a string, but for binary data. four length bytes instead of
two. value is a numpy array, and you can change its elements"""
tag = 7;
fmt = ">i%ds"
def dataType(self, value):
return array(value, uint8)
def __repr__(self):
return "<%s: length %d> ( %s )" % (self.__class__, len(self.value), self.name)
def pretty_string(self, indent=0):
if self.name:
return " " * indent + "%s( \"%s\" ): shape=%s dtype=%s %s" % (
str(self.__class__.__name__),
self.name,
str(self.value.shape),
str(self.value.dtype),
self.value)
else:
return " " * indent + "%s: %s %s" % (str(self.__class__.__name__), str(self.value.shape), self.value)
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len + 4], 'uint8');
self = cls(value)
return self, data_cursor + string_len + 4
def __init__(self, value=zeros(0, uint8), name=None):
if name:
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr), valuestr))
class TAG_Int_Array(TAG_Byte_Array):
"""An array of ints"""
tag = 11;
def dataType(self, value):
return array(value, '>u4')
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len * 4 + 4], '>u4')
self = cls(value)
return self, data_cursor + len(self.value) * 4 + 4;
def __init__(self, value=zeros(0, ">u4"), name=None):
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 4, valuestr))
class TAG_Short_Array(TAG_Int_Array):
"""An array of ints"""
tag = 12;
def dataType(self, value):
return array(value, '>u2')
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">I", data);
value = fromstring(data[4:string_len * 2 + 4], '>u2')
self = cls(value)
return self, data_cursor + len(self.value) * 2 + 4;
def __init__(self, value=zeros(0, ">u2"), name=None):
self.name = name
self.value = value;
def write_value(self, buf):
#print self.value
valuestr = self.value.tostring()
buf.write(struct.pack(self.fmt % (len(valuestr),), len(valuestr) / 2, valuestr))
class TAG_String(TAG_Value):
"""String in UTF-8
The value parameter must be a 'unicode' or a UTF-8 encoded 'str'
"""
tag = 8;
fmt = ">h%ds"
dataType = lambda self, s: isinstance(s, unicode) and s.encode('utf-8') or s
@classmethod
def load_from(cls, data, data_cursor):
data = data[data_cursor:]
(string_len,) = struct.unpack_from(">H", data);
value = data[2:string_len + 2].tostring();
self = cls(value)
return self, data_cursor + string_len + 2;
def __init__(self, value="", name=None):
if name:
self.name = name
self.value = value
def write_value(self, buf):
u8value = self._value
buf.write(struct.pack(self.fmt % (len(u8value),), len(u8value), u8value))
@property
def unicodeValue(self):
return self.value.decode('utf-8')
class TAG_Compound(TAG_Value, collections.MutableMapping):
"""A heterogenous list of named tags. Names must be unique within
the TAG_Compound. Add tags to the compound using the subscript
operator []. This will automatically name the tags."""
tag = 10;
def dataType(self, val):
for i in val:
assert isinstance(i, TAG_Value)
assert i.name
return list(val)
def __repr__(self):
return "%s( %s ): %s" % (str(self.__class__.__name__), self.name, self.value)
def pretty_string(self, indent=0):
if self.name:
pretty = " " * indent + "%s( \"%s\" ): %d items\n" % (str(self.__class__.__name__), self.name, len(self.value))
else:
pretty = " " * indent + "%s(): %d items\n" % (str(self.__class__.__name__), len(self.value))
indent += 4
for tag in self.value:
pretty += tag.pretty_string(indent) + "\n"
return pretty
@classmethod
def load_from(cls, data, data_cursor):
self = cls()
while data_cursor < len(data):
tag_type = data[data_cursor];
data_cursor += 1
if(tag_type == 0):
break
tag, data_cursor = load_named(data, data_cursor, tag_type)
self._value.append(tag);
return self, data_cursor
def __init__(self, value=[], name=""):
self.name = name;
if value.__class__ == ''.__class__:
self.name = value;
value = [];
self.value = value;
def write_value(self, buf):
for i in self.value:
i.save(buf=buf)
buf.write("\x00")
"collection functions"
def __getitem__(self, k):
#hits=filter(lambda x:x.name==k, self.value);
#if(len(hits)): return hits[0];
for key in self.value:
if key.name == k: return key
raise KeyError("Key {0} not found in tag {1}".format(k, self));
def __iter__(self): return itertools.imap(lambda x:x.name, self.value);
def __contains__(self, k):return k in map(lambda x:x.name, self.value);
def __len__(self): return self.value.__len__()
def __setitem__(self, k, v):
"""Automatically wraps lists and tuples in a TAG_List, and wraps strings
and unicodes in a TAG_String."""
if isinstance(v, (list, tuple)):
v = TAG_List(v)
elif isinstance(v, basestring):
v = TAG_String(v)
if not (v.__class__ in tag_classes.values()): raise TypeError("Invalid type %s for TAG_Compound" % (v.__class__))
"""remove any items already named "k". """
olditems = filter(lambda x:x.name == k, self.value)
for i in olditems: self.value.remove(i)
self.value.append(v);
v.name = k;
def __delitem__(self, k): self.value.__delitem__(self.value.index(self[k]));
def add(self, v):
self[v.name] = v;
class TAG_List(TAG_Value, collections.MutableSequence):
"""A homogenous list of unnamed data of a single TAG_* type.
Once created, the type can only be changed by emptying the list
and adding an element of the new type. If created with no arguments,
returns a list of TAG_Compound
Empty lists in the wild have been seen with type TAG_Byte"""
tag = 9;
def dataType(self, val):
if val:
listType = val[0].__class__
# FIXME: This is kinda weird; None as the empty tag name?
assert all(isinstance(x, listType) and x.name in ("", "None") for x in val)
return list(val)
def __repr__(self):
return "%s( %s ): %s" % (self.__class__.__name__, self.name, self.value)
def pretty_string(self, indent=0):
if self.name:
pretty = " " * indent + "%s( \"%s\" ):\n" % (str(self.__class__.__name__), self.name)
else:
pretty = " " * indent + "%s():\n" % (str(self.__class__.__name__),)
indent += 4
for tag in self.value:
pretty += tag.pretty_string(indent) + "\n"
return pretty
@classmethod
def load_from(cls, data, data_cursor):
self = cls()
self.list_type = data[data_cursor];
data_cursor += 1;
list_length, data_cursor = TAG_Int.load_from(data, data_cursor)
list_length = list_length.value
for i in range(list_length):
tag, data_cursor = tag_classes[self.list_type].load_from(data, data_cursor)
self.append(tag);
return self, data_cursor
def __init__(self, value=[], name=None, list_type=TAG_Compound):
#can be created from a list of tags in value, with an optional
#name, or created from raw tag data, or created with list_type
#taken from a TAG class or instance
self.name = name
self.list_type = list_type.tag
if(len(value)):
self.list_type = value[0].tag;
value = filter(lambda x:x.__class__ == value[0].__class__, value)
self.value = value
""" collection methods """
def __iter__(self): return iter(self.value)
def __contains__(self, k):return k in self.value;
def __getitem__(self, i): return self.value[i];
def __len__(self): return len(self.value)
def __setitem__(self, i, v):
if v.__class__ != tag_classes[self.list_type]:
raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type]))
v.name = ""
self.value[i] = v;
def __delitem__(self, i):
del self.value[i]
def insert(self, i, v):
if not v.tag in tag_classes: raise TypeError("Not a tag type: %s" % (v,))
if len(self) == 0:
self.list_type = v.tag
else:
if v.__class__ != tag_classes[self.list_type]: raise TypeError("Invalid type %s for TAG_List(%s)" % (v.__class__, tag_classes[self.list_type]))
v.name = ""
self.value.insert(i, v);
def write_value(self, buf):
buf.write(struct.pack(TAGfmt, self.list_type))
TAG_Int(len(self)).write_value(buf)
for i in self.value:
i.write_value(buf)
tag_classes = {
1 : TAG_Byte,
2 : TAG_Short,
3 : TAG_Int,
4 : TAG_Long,
5 : TAG_Float,
6 : TAG_Double,
7 : TAG_Byte_Array,
8 : TAG_String,
9 : TAG_List,
10: TAG_Compound,
11: TAG_Int_Array,
12: TAG_Short_Array,
};
import zlib
def gunzip(data):
#strip off the header and use negative WBITS to tell zlib there's no header
return zlib.decompress(data[10:], -zlib.MAX_WBITS)
def loadFile(filename):
with file(filename, "rb") as f:
inputdata = f.read()
data = inputdata
try:
data = gunzip(inputdata)
except IOError:
print "File %s not zipped" % filename
return load(buf=fromstring(data, 'uint8'));
def load_named(data, data_cursor, tag_type):
tag_name, data_cursor = TAG_String.load_from(data, data_cursor)
tag_name = tag_name.value
tag, data_cursor = tag_classes[tag_type].load_from(data, data_cursor)
tag.name = tag_name
return tag, data_cursor
def load(filename="", buf=None):
"""Unserialize data from an entire NBT file and return the
root TAG_Compound object. Argument can be a string containing a
filename or an array of integers containing TAG_Compound data. """
if filename and isinstance(filename, (str, unicode)):
return loadFile(filename)
if isinstance(buf, str): buf = fromstring(buf, uint8)
data = buf;
#if buf != None: data = buf
if not len(buf):
raise NBTFormatError, "Asked to load root tag of zero length"
data_cursor = 0;
tag_type = data[data_cursor];
if tag_type != 10:
raise NBTFormatError, 'Not an NBT file with a root TAG_Compound (found {0})'.format(tag_type);
data_cursor += 1;
tag, data_cursor = load_named(data, data_cursor, tag_type)
return tag;
__all__ = [a.__name__ for a in tag_classes.itervalues()] + ["load", "loadFile", "gunzip"]

View File

@ -40,12 +40,8 @@ class MCSchematic (EntityLevel):
if filename:
self.filename = filename
if None is root_tag:
try:
root_tag = nbt.load(filename)
except IOError, e:
error(u"Failed to load file {0}".format (e))
if None is root_tag and os.path.exists(filename):
root_tag = nbt.load(filename)
else:
self.filename = None
@ -120,16 +116,16 @@ class MCSchematic (EntityLevel):
try:
self.root_tag = nbt.load(buf=fromstring(data, dtype='uint8'));
self.root_tag = nbt.load(buf=data);
except Exception, e:
error(u"Malformed NBT data in schematic file: {0} ({1})".format(self.filename, e))
raise ChunkMalformed, self.filename
raise ChunkMalformed, (e,self.filename), sys.exc_info()[2]
try:
self.shapeChunkData()
except KeyError, e:
error(u"Incorrect schematic format in file: {0} ({1})".format(self.filename, e))
raise ChunkMalformed, self.filename
raise ChunkMalformed, (e,self.filename), sys.exc_info()[2]
pass
self.dataIsPacked = True;

View File

@ -6,6 +6,7 @@ Created on Jul 23, 2011
#from mclevel import fromFile, loadWorldNumber, BoundingBox
#from infiniteworld import MCInfdevOldLevel
#from schematic import MCSchematic
#import errorreporting # annotate tracebacks with call arguments
try:
from pymclevel import *
except ImportError:
@ -23,7 +24,7 @@ import time
import numpy
from numpy import *
#from pymclevel.infiniteworld import MCServerChunkGenerator
from infiniteworld import MCServerChunkGenerator
log = logging.getLogger(__name__)
warn, error, info, debug = log.warn, log.error, log.info, log.debug
@ -169,7 +170,7 @@ class TestNBT(unittest.TestCase):
n = newlevel["Map"]["Spawn"][0].name
if(n): print "Named list element failed: %s" % n;
"""
attempt to delete non-existent TAG_Compound elements
this generates a KeyError like a python dict does.
@ -186,9 +187,11 @@ class TestNBT(unittest.TestCase):
d = join("testfiles", "TileTicks_chunks")
files = [join(d, f) for f in os.listdir(d)]
startTime = time.time()
for f in files[:20]:
n = nbt.load(f)
for i in range(20):
for f in files[:40]:
n = nbt.load(f)
print "Duration: ", time.time() - startTime
#print "NBT: ", n
class TestIndevLevel(unittest.TestCase):
def setUp(self):
@ -317,7 +320,16 @@ class TestAlphaLevel(unittest.TestCase):
level.generateLights();
level.saveInPlace();
def testRecompress(self):
cx,cz = -3, -1
level = self.alphalevel.level
ch = level.getChunk(cx,cz)
ch.dirty = True
level.saveInPlace()
ch.Blocks
print ch.root_tag
def testPlayerSpawn(self):
level = self.alphalevel.level