refactor: remove obsolete model framework

This commit is contained in:
Sefa Eyeoglu 2022-04-05 15:07:12 +02:00
parent 6777305c00
commit 00cbf2073b
No known key found for this signature in database
GPG Key ID: C10411294912A422
11 changed files with 1 additions and 1569 deletions

View File

@ -2,7 +2,7 @@ FROM python:3.10.2-bullseye
ARG UID=1337
ARG GID=1337
RUN pip install cachecontrol iso8601 requests lockfile jsonobject six pydantic \
RUN pip install cachecontrol requests lockfile pydantic \
&& apt-get update && apt-get install -y rsync cron
# add our cronjob

View File

@ -1,7 +1,6 @@
import os
import re
import sys
from datetime import timezone
from distutils.version import LooseVersion
from meta.common import ensure_component_dir, polymc_path, upstream_path, static_path

View File

@ -1,17 +0,0 @@
# TODO: maybe move to pydantic in the future?
from __future__ import absolute_import
from .base import JsonObjectMeta
from .containers import JsonArray
from .properties import *
from .base_properties import *
from .api import JsonObject
__all__ = [
'IntegerProperty', 'FloatProperty', 'DecimalProperty',
'StringProperty', 'BooleanProperty',
'DateProperty', 'DateTimeProperty', 'TimeProperty',
'ObjectProperty', 'ListProperty', 'DictProperty', 'SetProperty',
'JsonObject', 'JsonArray', 'AbstractDateProperty', 'JsonProperty',
'DefaultProperty'
]

View File

@ -1,53 +0,0 @@
from __future__ import absolute_import
from .base import JsonObjectBase, _LimitedDictInterfaceMixin
import six
import decimal
import datetime
from . import properties
import re
re_date = re.compile(r'^(\d{4})\D?(0[1-9]|1[0-2])\D?([12]\d|0[1-9]|3[01])$')
re_time = re.compile(
r'^([01]\d|2[0-3])\D?([0-5]\d)\D?([0-5]\d)?\D?(\d{3,6})?$')
re_datetime = re.compile(
r'^(\d{4})\D?(0[1-9]|1[0-2])\D?([12]\d|0[1-9]|3[01])'
r'(\D?([01]\d|2[0-3])\D?([0-5]\d)\D?([0-5]\d)?\D?(\d{3,6})?'
r'([zZ]|([\+-])([01]\d|2[0-3])\D?([0-5]\d)?)?)?$'
)
re_decimal = re.compile('^(\d+)\.(\d+)$')
if six.PY3:
unicode = str
long = int
class JsonObject(JsonObjectBase, _LimitedDictInterfaceMixin):
def __getstate__(self):
return self.to_json()
def __setstate__(self, dct):
self.__init__(dct)
class Meta(object):
properties = {
decimal.Decimal: properties.DecimalProperty,
datetime.datetime: properties.DateTimeProperty,
datetime.date: properties.DateProperty,
datetime.time: properties.TimeProperty,
str: properties.StringProperty,
unicode: properties.StringProperty,
bool: properties.BooleanProperty,
int: properties.IntegerProperty,
long: properties.IntegerProperty,
float: properties.FloatProperty,
list: properties.ListProperty,
dict: properties.DictProperty,
set: properties.SetProperty,
}
string_conversions = (
(re_date, datetime.date),
(re_time, datetime.time),
(re_datetime, datetime.datetime),
(re_decimal, decimal.Decimal),
)

View File

@ -1,394 +0,0 @@
from __future__ import absolute_import
from collections import namedtuple, OrderedDict
import copy
import six
import inspect
from .exceptions import (
DeleteNotAllowed,
WrappingAttributeError,
)
from .base_properties import JsonProperty, DefaultProperty
from .utils import check_type
JsonObjectClassSettings = namedtuple('JsonObjectClassSettings', ['type_config'])
CLASS_SETTINGS_ATTR = '_$_class_settings'
def get_settings(cls):
return getattr(cls, CLASS_SETTINGS_ATTR,
JsonObjectClassSettings(type_config=TypeConfig()))
def set_settings(cls, settings):
setattr(cls, CLASS_SETTINGS_ATTR, settings)
class TypeConfig(object):
"""
This class allows the user to configure dynamic
type handlers and string conversions for their JsonObject.
properties is a map from python types to JsonProperty subclasses
string_conversions is a list or tuple of (regex, python type)-tuples
This class is used to store the configuration but is not part of the API.
To configure:
class Foo(JsonObject):
# property definitions go here
# ...
class Meta(object):
update_properties = {
datetime.datetime: MySpecialDateTimeProperty
}
# this is already set by default
# but you can override with your own modifications
string_conversions = ((date_re, datetime.date),
(datetime_re, datetime.datetime),
(time_re, datetime.time),
(decimal_re, decimal.Decimal))
If you now do
foo = Foo()
foo.timestamp = datetime.datetime(1988, 7, 7, 11, 8, 0)
timestamp will be governed by a MySpecialDateTimeProperty
instead of the default.
"""
def __init__(self, properties=None, string_conversions=None):
self._properties = properties if properties is not None else {}
self._string_conversions = (
OrderedDict(string_conversions) if string_conversions is not None
else OrderedDict()
)
# cache this
self.string_conversions = self._get_string_conversions()
self.properties = self._properties
def replace(self, properties=None, string_conversions=None):
return TypeConfig(
properties=(properties if properties is not None
else self._properties),
string_conversions=(string_conversions if string_conversions is not None
else self._string_conversions)
)
def updated(self, properties=None, string_conversions=None):
"""
update properties and string_conversions with the paramenters
keeping all non-mentioned items the same as before
returns a new TypeConfig with these changes
(does not modify original)
"""
_properties = self._properties.copy()
_string_conversions = self.string_conversions[:]
if properties:
_properties.update(properties)
if string_conversions:
_string_conversions.extend(string_conversions)
return TypeConfig(
properties=_properties,
string_conversions=_string_conversions,
)
def _get_string_conversions(self):
result = []
for pattern, conversion in self._string_conversions.items():
conversion = (
conversion if conversion not in self._properties
else self._properties[conversion](type_config=self).to_python
)
result.append((pattern, conversion))
return result
META_ATTRS = ('properties', 'string_conversions', 'update_properties')
class JsonObjectMeta(type):
class Meta(object):
pass
def __new__(mcs, name, bases, dct):
cls = super(JsonObjectMeta, mcs).__new__(mcs, name, bases, dct)
cls.__configure(**{key: value
for key, value in cls.Meta.__dict__.items()
if key in META_ATTRS})
cls_settings = get_settings(cls)
properties = {}
properties_by_name = {}
for key, value in dct.items():
if isinstance(value, JsonProperty):
properties[key] = value
elif key.startswith('_'):
continue
elif type(value) in cls_settings.type_config.properties:
property_ = cls_settings.type_config.properties[type(value)](default=value)
properties[key] = dct[key] = property_
setattr(cls, key, property_)
for key, property_ in properties.items():
property_.init_property(default_name=key,
type_config=cls_settings.type_config)
assert property_.name is not None, property_
assert property_.name not in properties_by_name, \
'You can only have one property named {0}'.format(
property_.name)
properties_by_name[property_.name] = property_
for base in bases:
if getattr(base, '_properties_by_attr', None):
for key, value in base._properties_by_attr.items():
if key not in properties:
properties[key] = value
properties_by_name[value.name] = value
cls._properties_by_attr = properties
cls._properties_by_key = properties_by_name
return cls
def __configure(cls, properties=None, string_conversions=None,
update_properties=None):
super_settings = get_settings(super(cls, cls))
assert not properties or not update_properties, \
"{} {}".format(properties, update_properties)
type_config = super_settings.type_config
if update_properties is not None:
type_config = type_config.updated(properties=update_properties)
elif properties is not None:
type_config = type_config.replace(properties=properties)
if string_conversions is not None:
type_config = type_config.replace(
string_conversions=string_conversions)
set_settings(cls, super_settings._replace(type_config=type_config))
return cls
class _JsonObjectPrivateInstanceVariables(object):
def __init__(self, dynamic_properties=None):
self.dynamic_properties = dynamic_properties or {}
@six.add_metaclass(JsonObjectMeta)
class JsonObjectBase(object):
_allow_dynamic_properties = False
_validate_required_lazily = False
_properties_by_attr = None
_properties_by_key = None
_string_conversions = ()
def __init__(self, _obj=None, **kwargs):
setattr(self, '_$', _JsonObjectPrivateInstanceVariables())
self._obj = check_type(_obj, dict,
'JsonObject must wrap a dict or None')
self._wrapped = {}
for key, value in self._obj.items():
try:
self.set_raw_value(key, value)
except AttributeError:
raise WrappingAttributeError(
"can't set attribute corresponding to {key!r} "
"on a {cls} while wrapping {data!r}".format(
cls=self.__class__,
key=key,
data=_obj,
)
)
for attr, value in kwargs.items():
try:
setattr(self, attr, value)
except AttributeError:
raise WrappingAttributeError(
"can't set attribute {key!r} "
"on a {cls} while wrapping {data!r}".format(
cls=self.__class__,
key=attr,
data=_obj,
)
)
for key, value in self._properties_by_key.items():
if key not in self._obj:
try:
d = value.default()
except TypeError:
d = value.default(self)
self[key] = d
def set_raw_value(self, key, value):
wrapped = self.__wrap(key, value)
if key in self._properties_by_key:
self[key] = wrapped
else:
setattr(self, key, wrapped)
@classmethod
def properties(cls):
return cls._properties_by_attr.copy()
@property
def __dynamic_properties(self):
return getattr(self, '_$').dynamic_properties
@classmethod
def wrap(cls, obj):
self = cls(obj)
return self
def validate(self, required=True):
for key, value in self._wrapped.items():
self.__get_property(key).validate(value, required=required)
def to_json(self):
self.validate()
return copy.deepcopy(self._obj)
def __get_property(self, key):
try:
return self._properties_by_key[key]
except KeyError:
return DefaultProperty(type_config=get_settings(self).type_config)
def __wrap(self, key, value):
property_ = self.__get_property(key)
if value is None:
return None
return property_.wrap(value)
def __unwrap(self, key, value):
property_ = self.__get_property(key)
try:
property_.validate(
value,
required=not self._validate_required_lazily,
recursive=False,
)
except TypeError:
property_.validate(
value,
required=not self._validate_required_lazily,
)
if value is None:
return None, None
return property_.unwrap(value)
def __setitem__(self, key, value):
wrapped, unwrapped = self.__unwrap(key, value)
self._wrapped[key] = wrapped
if self.__get_property(key).exclude(unwrapped):
self._obj.pop(key, None)
else:
self._obj[key] = unwrapped
if key not in self._properties_by_key:
assert key not in self._properties_by_attr
self.__dynamic_properties[key] = wrapped
super(JsonObjectBase, self).__setattr__(key, wrapped)
def __is_dynamic_property(self, name):
return (
name not in self._properties_by_attr and
not name.startswith('_') and
not inspect.isdatadescriptor(getattr(self.__class__, name, None))
)
def __setattr__(self, name, value):
if self.__is_dynamic_property(name):
if self._allow_dynamic_properties:
self[name] = value
else:
raise AttributeError(
"{0!r} is not defined in schema "
"(not a valid property)".format(name)
)
else:
super(JsonObjectBase, self).__setattr__(name, value)
def __delitem__(self, key):
if key in self._properties_by_key:
raise DeleteNotAllowed(key)
else:
if not self.__is_dynamic_property(key):
raise KeyError(key)
del self._obj[key]
del self._wrapped[key]
del self.__dynamic_properties[key]
super(JsonObjectBase, self).__delattr__(key)
def __delattr__(self, name):
if name in self._properties_by_attr:
raise DeleteNotAllowed(name)
elif self.__is_dynamic_property(name):
del self[name]
else:
super(JsonObjectBase, self).__delattr__(name)
def __repr__(self):
name = self.__class__.__name__
predefined_properties = self._properties_by_attr.keys()
predefined_property_keys = set(self._properties_by_attr[p].name
for p in predefined_properties)
dynamic_properties = (set(self._wrapped.keys())
- predefined_property_keys)
properties = sorted(predefined_properties) + sorted(dynamic_properties)
return u'{name}({keyword_args})'.format(
name=name,
keyword_args=', '.join('{key}={value!r}'.format(
key=key,
value=getattr(self, key)
) for key in properties),
)
class _LimitedDictInterfaceMixin(object):
"""
mindlessly farms selected dict methods out to an internal dict
really only a separate class from JsonObject
to keep this mindlessness separate from the methods
that need to be more carefully understood
"""
_wrapped = None
def keys(self):
return self._wrapped.keys()
def items(self):
return self._wrapped.items()
def iteritems(self):
return self._wrapped.iteritems()
def __contains__(self, item):
return item in self._wrapped
def __getitem__(self, item):
return self._wrapped[item]
def __iter__(self):
return iter(self._wrapped)
def __len__(self):
return len(self._wrapped)
def get_dynamic_properties(obj):
return getattr(obj, '_$').dynamic_properties.copy()

View File

@ -1,320 +0,0 @@
from __future__ import absolute_import
import six
import inspect
from .exceptions import BadValueError
function_name = None
if six.PY3:
def function_name(f):
return f.__name__
else:
def function_name(f):
return f.func_name
class JsonProperty(object):
default = None
type_config = None
def __init__(self, default=Ellipsis, name=None, choices=None,
required=False, exclude_if_none=False, validators=None,
verbose_name=None, type_config=None):
validators = validators or ()
self.name = name
if default is Ellipsis:
default = self.default
if callable(default):
self.default = default
else:
self.default = lambda: default
self.choices = choices
self.choice_keys = []
if choices:
for choice in choices:
if isinstance(choice, tuple):
choice, _ = choice
self.choice_keys.append(choice)
self.required = required
self.exclude_if_none = exclude_if_none
self._validators = validators
self.verbose_name = verbose_name
if type_config:
self.type_config = type_config
def init_property(self, default_name, type_config):
self.name = self.name or default_name
self.type_config = self.type_config or type_config
def wrap(self, obj):
raise NotImplementedError()
def unwrap(self, obj):
"""
must return tuple of (wrapped, unwrapped)
If obj is already a fully wrapped object,
it must be returned as the first element.
For an example where the first element is relevant see ListProperty
"""
raise NotImplementedError()
def to_json(self, value):
_, unwrapped = self.unwrap(value)
return unwrapped
def to_python(self, value):
return self.wrap(value)
def __get__(self, instance, owner):
if instance:
assert self.name in instance
return instance[self.name]
else:
return self
def __set__(self, instance, value):
instance[self.name] = value
def __call__(self, method):
"""
use a property as a decorator to set its default value
class Document(JsonObject):
@StringProperty()
def doc_type(self):
return self.__class__.__name__
"""
assert self.default() is None
self.default = method
self.name = self.name or function_name(method)
return self
def exclude(self, value):
return self.exclude_if_none and value == None
def empty(self, value):
return value is None
def validate(self, value, required=True, recursive=True):
if (self.choice_keys and value not in self.choice_keys
and value is not None):
raise BadValueError(
'{0!r} not in choices: {1!r}'.format(value, self.choice_keys)
)
if not self.empty(value):
self._custom_validate(value)
elif required and self.required:
raise BadValueError(
'Property {0} is required.'.format(self.name)
)
if recursive and hasattr(value, 'validate'):
value.validate(required=required)
def _custom_validate(self, value):
if self._validators:
if hasattr(self._validators, '__iter__'):
for validator in self._validators:
validator(value)
else:
self._validators(value)
class JsonContainerProperty(JsonProperty):
_type = default = None
container_class = None
def __init__(self, item_type=None, **kwargs):
self._item_type_deferred = item_type
super(JsonContainerProperty, self).__init__(**kwargs)
def init_property(self, **kwargs):
super(JsonContainerProperty, self).init_property(**kwargs)
if not inspect.isfunction(self._item_type_deferred):
# trigger validation
self.item_type
def set_item_type(self, item_type):
from meta.jsonobject.base import JsonObjectMeta
if hasattr(item_type, '_type'):
item_type = item_type._type
if isinstance(item_type, tuple):
# this is for the case where item_type = (int, long)
item_type = item_type[0]
allowed_types = set(self.type_config.properties.keys())
if isinstance(item_type, JsonObjectMeta) \
or not item_type or item_type in allowed_types:
self._item_type = item_type
else:
raise ValueError("item_type {0!r} not in {1!r}".format(
item_type,
allowed_types,
))
@property
def item_type(self):
if hasattr(self, '_item_type_deferred'):
if inspect.isfunction(self._item_type_deferred):
self.set_item_type(self._item_type_deferred())
else:
self.set_item_type(self._item_type_deferred)
del self._item_type_deferred
return self._item_type
def empty(self, value):
return not value
def wrap(self, obj):
wrapper = self.type_to_property(self.item_type) if self.item_type else None
return self.container_class(obj, wrapper=wrapper,
type_config=self.type_config)
def type_to_property(self, item_type):
map_types_properties = self.type_config.properties
from .properties import ObjectProperty
from .base import JsonObjectBase
if issubclass(item_type, JsonObjectBase):
return ObjectProperty(item_type, type_config=self.type_config)
elif item_type in map_types_properties:
return map_types_properties[item_type](type_config=self.type_config)
else:
for key, value in map_types_properties.items():
if issubclass(item_type, key):
return value(type_config=self.type_config)
raise TypeError('Type {0} not recognized'.format(item_type))
def unwrap(self, obj):
if not isinstance(obj, self._type):
raise BadValueError(
'{0!r} is not an instance of {1!r}'.format(
obj, self._type.__name__)
)
if isinstance(obj, self.container_class):
return obj, obj._obj
else:
wrapped = self.wrap(self._type())
self._update(wrapped, obj)
return self.unwrap(wrapped)
def _update(self, container, extension):
raise NotImplementedError()
class DefaultProperty(JsonProperty):
def wrap(self, obj):
assert self.type_config.string_conversions is not None
value = self.value_to_python(obj)
property_ = self.value_to_property(value)
if property_:
return property_.wrap(obj)
def unwrap(self, obj):
property_ = self.value_to_property(obj)
if property_:
return property_.unwrap(obj)
else:
return obj, None
def value_to_property(self, value):
map_types_properties = self.type_config.properties
if value is None:
return None
elif type(value) in map_types_properties:
return map_types_properties[type(value)](
type_config=self.type_config)
else:
for value_type, prop_class in map_types_properties.items():
if isinstance(value, value_type):
return prop_class(type_config=self.type_config)
else:
raise BadValueError(
'value {0!r} not in allowed types: {1!r}'.format(
value, map_types_properties.keys())
)
def value_to_python(self, value):
"""
convert encoded string values to the proper python type
ex:
>>> DefaultProperty().value_to_python('2013-10-09T10:05:51Z')
datetime.datetime(2013, 10, 9, 10, 5, 51)
other values will be passed through unmodified
Note: containers' items are NOT recursively converted
"""
if isinstance(value, six.string_types):
convert = None
for pattern, _convert in self.type_config.string_conversions:
if pattern.match(value):
convert = _convert
break
if convert is not None:
try:
# sometimes regex fail so return value
value = convert(value)
except Exception:
pass
return value
class AssertTypeProperty(JsonProperty):
_type = None
def assert_type(self, obj):
if not isinstance(obj, self._type):
raise BadValueError(
'{0!r} not of type {1!r}'.format(obj, self._type)
)
def selective_coerce(self, obj):
return obj
def wrap(self, obj):
obj = self.selective_coerce(obj)
self.assert_type(obj)
return obj
def unwrap(self, obj):
obj = self.selective_coerce(obj)
self.assert_type(obj)
return obj, obj
class AbstractDateProperty(JsonProperty):
_type = None
def __init__(self, exact=False, *args, **kwargs):
super(AbstractDateProperty, self).__init__(*args, **kwargs)
self.exact = exact
def wrap(self, obj):
try:
if not isinstance(obj, six.string_types):
raise ValueError()
return self._wrap(obj)
except ValueError:
raise BadValueError('{0!r} is not a {1}-formatted string'.format(
obj,
self._type.__name__,
))
def unwrap(self, obj):
if not isinstance(obj, self._type):
raise BadValueError('{0!r} is not a {1} object'.format(
obj,
self._type.__name__,
))
return self._unwrap(obj)
def _wrap(self, obj):
raise NotImplementedError()
def _unwrap(self, obj):
raise NotImplementedError()

View File

@ -1,252 +0,0 @@
from __future__ import absolute_import
from .base_properties import DefaultProperty
from .utils import check_type, SimpleDict
import copy
class JsonArray(list):
def __init__(self, _obj=None, wrapper=None, type_config=None):
super(JsonArray, self).__init__()
self._obj = check_type(_obj, list,
'JsonArray must wrap a list or None')
assert type_config is not None
self._type_config = type_config
self._wrapper = (
wrapper or
DefaultProperty(type_config=self._type_config)
)
for item in self._obj:
super(JsonArray, self).append(self._wrapper.wrap(item))
def validate(self, required=True):
for obj in self:
self._wrapper.validate(obj, required=required)
def to_json(self):
self.validate()
return copy.deepcopy(self._obj)
def append(self, wrapped):
wrapped, unwrapped = self._wrapper.unwrap(wrapped)
self._obj.append(unwrapped)
super(JsonArray, self).append(wrapped)
def __delitem__(self, i):
super(JsonArray, self).__delitem__(i)
del self._obj[i]
def __setitem__(self, i, wrapped):
wrapped, unwrapped = self._wrapper.unwrap(wrapped)
self._obj[i] = unwrapped
super(JsonArray, self).__setitem__(i, wrapped)
def extend(self, wrapped_list):
if wrapped_list:
wrapped_list, unwrapped_list = zip(
*map(self._wrapper.unwrap, wrapped_list)
)
else:
unwrapped_list = []
self._obj.extend(unwrapped_list)
super(JsonArray, self).extend(wrapped_list)
def insert(self, index, wrapped):
wrapped, unwrapped = self._wrapper.unwrap(wrapped)
self._obj.insert(index, unwrapped)
super(JsonArray, self).insert(index, wrapped)
def remove(self, value):
i = self.index(value)
super(JsonArray, self).remove(value)
self._obj.pop(i)
def pop(self, index=-1):
self._obj.pop(index)
return super(JsonArray, self).pop(index)
def sort(self, cmp=None, key=None, reverse=False):
zipped = zip(self, self._obj)
if key:
new_key = lambda pair: key(pair[0])
zipped.sort(key=new_key, reverse=reverse)
elif cmp:
new_cmp = lambda pair1, pair2: cmp(pair1[0], pair2[0])
zipped.sort(cmp=new_cmp, reverse=reverse)
else:
zipped.sort(reverse=reverse)
wrapped_list, unwrapped_list = zip(*zipped)
while self:
self.pop()
super(JsonArray, self).extend(wrapped_list)
self._obj.extend(unwrapped_list)
def reverse(self):
self._obj.reverse()
super(JsonArray, self).reverse()
def __fix_slice(self, i, j):
length = len(self)
if j < 0:
j += length
if i < 0:
i += length
if i > length:
i = length
if j > length:
j = length
return i, j
def __setslice__(self, i, j, sequence):
i, j = self.__fix_slice(i, j)
for _ in range(j - i):
self.pop(i)
for k, wrapped in enumerate(sequence):
self.insert(i + k, wrapped)
def __delslice__(self, i, j):
i, j = self.__fix_slice(i, j)
for _ in range(j - i):
self.pop(i)
class JsonDict(SimpleDict):
def __init__(self, _obj=None, wrapper=None, type_config=None):
super(JsonDict, self).__init__()
self._obj = check_type(_obj, dict, 'JsonDict must wrap a dict or None')
assert type_config is not None
self._type_config = type_config
self._wrapper = (
wrapper or
DefaultProperty(type_config=self._type_config)
)
for key, value in self._obj.items():
self[key] = self.__wrap(key, value)
def validate(self, required=True):
for obj in self.values():
self._wrapper.validate(obj, required=required)
def __wrap(self, key, unwrapped):
return self._wrapper.wrap(unwrapped)
def __unwrap(self, key, wrapped):
return self._wrapper.unwrap(wrapped)
def __setitem__(self, key, value):
if isinstance(key, int):
key = str(key)
wrapped, unwrapped = self.__unwrap(key, value)
self._obj[key] = unwrapped
super(JsonDict, self).__setitem__(key, wrapped)
def __delitem__(self, key):
del self._obj[key]
super(JsonDict, self).__delitem__(key)
def __getitem__(self, key):
if isinstance(key, int):
key = str(key)
return super(JsonDict, self).__getitem__(key)
class JsonSet(set):
def __init__(self, _obj=None, wrapper=None, type_config=None):
super(JsonSet, self).__init__()
if isinstance(_obj, set):
_obj = list(_obj)
self._obj = check_type(_obj, list, 'JsonSet must wrap a list or None')
assert type_config is not None
self._type_config = type_config
self._wrapper = (
wrapper or
DefaultProperty(type_config=self._type_config)
)
for item in self._obj:
super(JsonSet, self).add(self._wrapper.wrap(item))
def validate(self, required=True):
for obj in self:
self._wrapper.validate(obj, required=required)
def add(self, wrapped):
wrapped, unwrapped = self._wrapper.unwrap(wrapped)
if wrapped not in self:
self._obj.append(unwrapped)
super(JsonSet, self).add(wrapped)
def remove(self, wrapped):
wrapped, unwrapped = self._wrapper.unwrap(wrapped)
if wrapped in self:
self._obj.remove(unwrapped)
super(JsonSet, self).remove(wrapped)
else:
raise KeyError(wrapped)
def discard(self, wrapped):
try:
self.remove(wrapped)
except KeyError:
pass
def pop(self):
# get first item
for wrapped in self:
break
else:
raise KeyError()
wrapped_, unwrapped = self._wrapper.unwrap(wrapped)
assert wrapped is wrapped_
self.remove(unwrapped)
return wrapped
def clear(self):
while self:
self.pop()
def __ior__(self, other):
for wrapped in other:
self.add(wrapped)
return self
def update(self, *args):
for wrapped_list in args:
self |= set(wrapped_list)
union_update = update
def __iand__(self, other):
for wrapped in list(self):
if wrapped not in other:
self.remove(wrapped)
return self
def intersection_update(self, *args):
for wrapped_list in args:
self &= set(wrapped_list)
def __isub__(self, other):
for wrapped in list(self):
if wrapped in other:
self.remove(wrapped)
return self
def difference_update(self, *args):
for wrapped_list in args:
self -= set(wrapped_list)
def __ixor__(self, other):
removed = set()
for wrapped in list(self):
if wrapped in other:
self.remove(wrapped)
removed.add(wrapped)
self.update(other - removed)
return self
def symmetric_difference_update(self, *args):
for wrapped_list in args:
self ^= set(wrapped_list)

View File

@ -1,10 +0,0 @@
class DeleteNotAllowed(Exception):
pass
class BadValueError(Exception):
"""raised when a value can't be validated or is required"""
class WrappingAttributeError(AttributeError):
pass

View File

@ -1,155 +0,0 @@
# DateTimeProperty, DateProperty, and TimeProperty
# include code copied from couchdbkit
from __future__ import absolute_import
import sys
import datetime
import time
import decimal
from .base_properties import (
AbstractDateProperty,
AssertTypeProperty,
JsonContainerProperty,
JsonProperty,
DefaultProperty,
)
from .containers import JsonArray, JsonDict, JsonSet
if sys.version > '3':
unicode = str
long = int
class StringProperty(AssertTypeProperty):
_type = (unicode, str)
def selective_coerce(self, obj):
if isinstance(obj, str):
obj = unicode(obj)
return obj
class BooleanProperty(AssertTypeProperty):
_type = bool
class IntegerProperty(AssertTypeProperty):
_type = (int, long)
class FloatProperty(AssertTypeProperty):
_type = float
def selective_coerce(self, obj):
if isinstance(obj, (int, long)):
obj = float(obj)
return obj
class DecimalProperty(JsonProperty):
def wrap(self, obj):
return decimal.Decimal(obj)
def unwrap(self, obj):
if isinstance(obj, (int, long)):
obj = decimal.Decimal(obj)
elif isinstance(obj, float):
# python 2.6 doesn't allow a float to Decimal
obj = decimal.Decimal(unicode(obj))
assert isinstance(obj, decimal.Decimal)
return obj, unicode(obj)
class DateProperty(AbstractDateProperty):
_type = datetime.date
def _wrap(self, value):
fmt = '%Y-%m-%d'
try:
return datetime.date(*time.strptime(value, fmt)[:3])
except ValueError as e:
raise ValueError('Invalid ISO date {0!r} [{1}]'.format(value, e))
def _unwrap(self, value):
return value, value.isoformat()
class DateTimeProperty(AbstractDateProperty):
_type = datetime.datetime
def _wrap(self, value):
if not self.exact:
value = value.split('.', 1)[0] # strip out microseconds
value = value[0:19] # remove timezone
fmt = '%Y-%m-%dT%H:%M:%S'
else:
fmt = '%Y-%m-%dT%H:%M:%S.%fZ'
try:
return datetime.datetime.strptime(value, fmt)
except ValueError as e:
raise ValueError(
'Invalid ISO date/time {0!r} [{1}]'.format(value, e))
def _unwrap(self, value):
if not self.exact:
value = value.replace(microsecond=0)
padding = ''
else:
padding = '' if value.microsecond else '.000000'
return value, value.isoformat() + padding + 'Z'
class TimeProperty(AbstractDateProperty):
_type = datetime.time
def _wrap(self, value):
if not self.exact:
value = value.split('.', 1)[0] # strip out microseconds
fmt = '%H:%M:%S'
else:
fmt = '%H:%M:%S.%f'
try:
return datetime.time(*time.strptime(value, fmt)[3:6])
except ValueError as e:
raise ValueError('Invalid ISO time {0!r} [{1}]'.format(value, e))
def _unwrap(self, value):
if not self.exact:
value = value.replace(microsecond=0)
return value, value.isoformat()
class ObjectProperty(JsonContainerProperty):
default = lambda self: self.item_type()
def wrap(self, obj, string_conversions=None):
return self.item_type.wrap(obj)
def unwrap(self, obj):
assert isinstance(obj, self.item_type), \
'{0} is not an instance of {1}'.format(obj, self.item_type)
return obj, obj._obj
class ListProperty(JsonContainerProperty):
_type = default = list
container_class = JsonArray
def _update(self, container, extension):
container.extend(extension)
class DictProperty(JsonContainerProperty):
_type = default = dict
container_class = JsonDict
def _update(self, container, extension):
container.update(extension)
class SetProperty(JsonContainerProperty):
_type = default = set
container_class = JsonSet
def _update(self, container, extension):
container.update(extension)

View File

@ -1,57 +0,0 @@
from __future__ import absolute_import
from .exceptions import BadValueError
def check_type(obj, item_type, message):
if obj is None:
return item_type()
elif not isinstance(obj, item_type):
raise BadValueError('{}. Found object of type: {}'.format(message, type(obj)))
else:
return obj
class SimpleDict(dict):
"""
Re-implements destructive methods of dict
to use only setitem and getitem and delitem
"""
def update(self, E=None, **F):
for dct in (E, F):
if dct:
for key, value in dct.items():
self[key] = value
def clear(self):
for key in list(self.keys()):
del self[key]
def pop(self, key, *args):
if len(args) > 1:
raise TypeError('pop expected at most 2 arguments, got 3')
try:
val = self[key]
del self[key]
return val
except KeyError:
try:
return args[0]
except IndexError:
raise KeyError(key)
def popitem(self):
try:
arbitrary_key = list(self.keys())[0]
except IndexError:
raise KeyError('popitem(): dictionary is empty')
val = self[arbitrary_key]
del self[arbitrary_key]
return (arbitrary_key, val)
def setdefault(self, key, default=None):
try:
return self[key]
except KeyError:
self[key] = default
return default

View File

@ -1,309 +0,0 @@
import datetime
import json
import os
import iso8601
from .jsonobject import *
from .common import polymc_path
PMC_DIR = polymc_path()
class ISOTimestampProperty(AbstractDateProperty):
_type = datetime.datetime
def _wrap(self, value):
try:
return iso8601.parse_date(value)
except ValueError as e:
raise ValueError(
'Invalid ISO date/time {0!r} [{1}]'.format(value, e))
def _unwrap(self, value):
return value, value.isoformat()
class GradleSpecifier:
'''
A gradle specifier - a maven coordinate. Like one of these:
"org.lwjgl.lwjgl:lwjgl:2.9.0"
"net.java.jinput:jinput:2.0.5"
"net.minecraft:launchwrapper:1.5"
'''
def __init__(self, name):
atSplit = name.split('@')
components = atSplit[0].split(':')
self.group = components[0]
self.artifact = components[1]
self.version = components[2]
self.extension = 'jar'
if len(atSplit) == 2:
self.extension = atSplit[1]
if len(components) == 4:
self.classifier = components[3]
else:
self.classifier = None
def toString(self):
extensionStr = ''
if self.extension != 'jar':
extensionStr = "@%s" % self.extension
if self.classifier:
return "%s:%s:%s:%s%s" % (self.group, self.artifact, self.version, self.classifier, extensionStr)
else:
return "%s:%s:%s%s" % (self.group, self.artifact, self.version, extensionStr)
def getFilename(self):
if self.classifier:
return "%s-%s-%s.%s" % (self.artifact, self.version, self.classifier, self.extension)
else:
return "%s-%s.%s" % (self.artifact, self.version, self.extension)
def getBase(self):
return "%s/%s/%s/" % (self.group.replace('.', '/'), self.artifact, self.version)
def getPath(self):
return self.getBase() + self.getFilename()
def __repr__(self):
return "GradleSpecifier('" + self.toString() + "')"
def isLwjgl(self):
return self.group in ("org.lwjgl", "org.lwjgl.lwjgl", "net.java.jinput", "net.java.jutils")
def isLog4j(self):
return self.group == "org.apache.logging.log4j"
def __lt__(self, other):
return self.toString() < other.toString()
def __eq__(self, other):
return self.group == other.group and self.artifact == other.artifact and self.version == other.version and self.classifier == other.classifier
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return self.toString().__hash__()
class GradleSpecifierProperty(JsonProperty):
def wrap(self, value):
return GradleSpecifier(value)
def unwrap(self, value):
return value, value.toString()
class MojangArtifactBase(JsonObject):
sha1 = StringProperty(exclude_if_none=True, default=None)
size = IntegerProperty(exclude_if_none=True, default=None)
url = StringProperty()
class MojangArtifact(MojangArtifactBase):
path = StringProperty(exclude_if_none=True, default=None)
class MojangAssets(MojangArtifactBase):
id = StringProperty()
totalSize = IntegerProperty()
class MojangLibraryDownloads(JsonObject):
artifact = ObjectProperty(MojangArtifact, exclude_if_none=True, default=None)
classifiers = DictProperty(MojangArtifact, exclude_if_none=True, default=None)
class MojangLibraryExtractRules(JsonObject):
exclude = ListProperty(StringProperty)
'''
"rules": [
{
"action": "allow"
},
{
"action": "disallow",
"os": {
"name": "osx"
}
}
]
'''
class OSRule(JsonObject):
name = StringProperty(choices=["osx", "linux", "windows"], required=True)
version = StringProperty(exclude_if_none=True, default=None)
class MojangRule(JsonObject):
action = StringProperty(choices=["allow", "disallow"], required=True)
os = ObjectProperty(OSRule, exclude_if_none=True, default=None)
class MojangLibrary(JsonObject):
extract = ObjectProperty(MojangLibraryExtractRules, exclude_if_none=True, default=None)
name = GradleSpecifierProperty(required=True)
downloads = ObjectProperty(MojangLibraryDownloads, exclude_if_none=True, default=None)
natives = DictProperty(StringProperty, exclude_if_none=True, default=None)
rules = ListProperty(MojangRule, exclude_if_none=True, default=None)
class MojangLoggingArtifact(MojangArtifactBase):
id = StringProperty()
class MojangLogging(JsonObject):
file = ObjectProperty(MojangLoggingArtifact, required=True)
argument = StringProperty(required=True)
type = StringProperty(required=True, choices=["log4j2-xml"])
class MojangArguments(JsonObject):
game = ListProperty(exclude_if_none=True, default=None)
jvm = ListProperty(exclude_if_none=True, default=None)
class JavaVersion(JsonObject):
component = StringProperty(default="jre-legacy")
majorVersion = IntegerProperty(default=8)
class UnknownVersionException(Exception):
"""Exception raised for unknown Mojang version file format versions.
Attributes:
message -- explanation of the error
"""
def __init__(self, message):
self.message = message
def validateSupportedMojangVersion(version):
supportedVersion = 21
if version > supportedVersion:
raise UnknownVersionException(
"Unsupported Mojang format version: %d. Max supported is: %d" % (version, supportedVersion))
class MojangVersionFile(JsonObject):
arguments = ObjectProperty(MojangArguments, exclude_if_none=True, default=None)
assetIndex = ObjectProperty(MojangAssets, exclude_if_none=True, default=None)
assets = StringProperty(exclude_if_none=True, default=None)
downloads = DictProperty(MojangArtifactBase, exclude_if_none=True, default=None)
id = StringProperty(exclude_if_none=True, default=None)
libraries = ListProperty(MojangLibrary, exclude_if_none=True, default=None)
mainClass = StringProperty(exclude_if_none=True, default=None)
processArguments = StringProperty(exclude_if_none=True, default=None)
minecraftArguments = StringProperty(exclude_if_none=True, default=None)
minimumLauncherVersion = IntegerProperty(exclude_if_none=True, default=None,
validators=validateSupportedMojangVersion)
releaseTime = ISOTimestampProperty(exclude_if_none=True, default=None)
time = ISOTimestampProperty(exclude_if_none=True, default=None)
type = StringProperty(exclude_if_none=True, default=None)
inheritsFrom = StringProperty(exclude_if_none=True, default=None)
logging = DictProperty(MojangLogging, exclude_if_none=True, default=None)
complianceLevel = IntegerProperty(exclude_if_none=True, default=None)
javaVersion = ObjectProperty(JavaVersion, exclude_if_none=True, default=None)
CurrentPolyMCFormatVersion = 1
def validateSupportedPolyMCVersion(version):
if version > CurrentPolyMCFormatVersion:
raise UnknownVersionException(
"Unsupported PolyMC format version: %d. Max supported is: %d" % (version, CurrentPolyMCFormatVersion))
class PolyMCLibrary(MojangLibrary):
url = StringProperty(exclude_if_none=True, default=None)
mmcHint = StringProperty(name="MMC-hint", exclude_if_none=True, default=None) # this is supposed to be MMC-hint!
class VersionedJsonObject(JsonObject):
formatVersion = IntegerProperty(default=CurrentPolyMCFormatVersion, validators=validateSupportedPolyMCVersion)
class DependencyEntry(JsonObject):
uid = StringProperty(required=True)
equals = StringProperty(exclude_if_none=True, default=None)
suggests = StringProperty(exclude_if_none=True, default=None)
class PolyMCVersionFile(VersionedJsonObject):
name = StringProperty(required=True)
version = StringProperty(required=True)
uid = StringProperty(required=True)
requires = ListProperty(DependencyEntry, exclude_if_none=True, default=None)
conflicts = ListProperty(DependencyEntry, exclude_if_none=True, default=None)
volatile = BooleanProperty(exclude_if_none=True, default=None)
assetIndex = ObjectProperty(MojangAssets, exclude_if_none=True, default=None)
libraries = ListProperty(PolyMCLibrary, exclude_if_none=True, default=None)
mavenFiles = ListProperty(PolyMCLibrary, exclude_if_none=True, default=None)
mainJar = ObjectProperty(PolyMCLibrary, exclude_if_none=True, default=None)
jarMods = ListProperty(PolyMCLibrary, exclude_if_none=True, default=None)
mainClass = StringProperty(exclude_if_none=True, default=None)
appletClass = StringProperty(exclude_if_none=True, default=None)
minecraftArguments = StringProperty(exclude_if_none=True, default=None)
releaseTime = ISOTimestampProperty(exclude_if_none=True, default=None)
type = StringProperty(exclude_if_none=True, default=None)
addTraits = ListProperty(StringProperty, name="+traits", exclude_if_none=True, default=None)
addTweakers = ListProperty(StringProperty, name="+tweakers", exclude_if_none=True, default=None)
order = IntegerProperty(exclude_if_none=True, default=None)
class PolyMCSharedPackageData(VersionedJsonObject):
name = StringProperty(required=True)
uid = StringProperty(required=True)
recommended = ListProperty(StringProperty, exclude_if_none=True, default=None)
authors = ListProperty(StringProperty, exclude_if_none=True, default=None)
description = StringProperty(exclude_if_none=True, default=None)
projectUrl = StringProperty(exclude_if_none=True, default=None)
def write(self):
try:
with open(PMC_DIR + "/%s/package.json" % self.uid, 'w') as file:
json.dump(self.to_json(), file, sort_keys=True, indent=4)
except EnvironmentError as e:
print("Error while trying to save shared packaged data for %s:" % self.uid, e)
def readSharedPackageData(uid):
with open(PMC_DIR + "/%s/package.json" % uid, 'r') as file:
return PolyMCSharedPackageData(json.load(file))
class PolyMCVersionIndexEntry(JsonObject):
version = StringProperty()
type = StringProperty(exclude_if_none=True, default=None)
releaseTime = ISOTimestampProperty()
requires = ListProperty(DependencyEntry, exclude_if_none=True, default=None)
conflicts = ListProperty(DependencyEntry, exclude_if_none=True, default=None)
recommended = BooleanProperty(exclude_if_none=True, default=None)
volatile = BooleanProperty(exclude_if_none=True, default=None)
sha256 = StringProperty()
class PolyMCVersionIndex(VersionedJsonObject):
name = StringProperty()
uid = StringProperty()
versions = ListProperty(PolyMCVersionIndexEntry)
class PolyMCPackageIndexEntry(JsonObject):
name = StringProperty()
uid = StringProperty()
sha256 = StringProperty()
class PolyMCPackageIndex(VersionedJsonObject):
packages = ListProperty(PolyMCPackageIndexEntry)