calibre-web/lib/sqlalchemy/testing/exclusions.py

335 lines
8.9 KiB
Python
Raw Normal View History

from __future__ import with_statement
import operator
from nose import SkipTest
from ..util import decorator
from . import config
from .. import util
import contextlib
class skip_if(object):
def __init__(self, predicate, reason=None):
self.predicate = _as_predicate(predicate)
self.reason = reason
_fails_on = None
@property
def enabled(self):
return not self.predicate()
@contextlib.contextmanager
def fail_if(self, name='block'):
try:
yield
except Exception, ex:
if self.predicate():
print ("%s failed as expected (%s): %s " % (
name, self.predicate, str(ex)))
else:
raise
else:
if self.predicate():
raise AssertionError(
"Unexpected success for '%s' (%s)" %
(name, self.predicate))
def __call__(self, fn):
@decorator
def decorate(fn, *args, **kw):
if self.predicate():
if self.reason:
msg = "'%s' : %s" % (
fn.__name__,
self.reason
)
else:
msg = "'%s': %s" % (
fn.__name__, self.predicate
)
raise SkipTest(msg)
else:
if self._fails_on:
with self._fails_on.fail_if(name=fn.__name__):
return fn(*args, **kw)
else:
return fn(*args, **kw)
return decorate(fn)
def fails_on(self, other, reason=None):
self._fails_on = skip_if(other, reason)
return self
class fails_if(skip_if):
def __call__(self, fn):
@decorator
def decorate(fn, *args, **kw):
with self.fail_if(name=fn.__name__):
return fn(*args, **kw)
return decorate(fn)
def only_if(predicate, reason=None):
predicate = _as_predicate(predicate)
return skip_if(NotPredicate(predicate), reason)
def succeeds_if(predicate, reason=None):
predicate = _as_predicate(predicate)
return fails_if(NotPredicate(predicate), reason)
class Predicate(object):
@classmethod
def as_predicate(cls, predicate):
if isinstance(predicate, skip_if):
return predicate.predicate
elif isinstance(predicate, Predicate):
return predicate
elif isinstance(predicate, list):
return OrPredicate([cls.as_predicate(pred) for pred in predicate])
elif isinstance(predicate, tuple):
return SpecPredicate(*predicate)
elif isinstance(predicate, basestring):
return SpecPredicate(predicate, None, None)
elif util.callable(predicate):
return LambdaPredicate(predicate)
else:
assert False, "unknown predicate type: %s" % predicate
class BooleanPredicate(Predicate):
def __init__(self, value, description=None):
self.value = value
self.description = description or "boolean %s" % value
def __call__(self):
return self.value
def _as_string(self, negate=False):
if negate:
return "not " + self.description
else:
return self.description
def __str__(self):
return self._as_string()
class SpecPredicate(Predicate):
def __init__(self, db, op=None, spec=None, description=None):
self.db = db
self.op = op
self.spec = spec
self.description = description
_ops = {
'<': operator.lt,
'>': operator.gt,
'==': operator.eq,
'!=': operator.ne,
'<=': operator.le,
'>=': operator.ge,
'in': operator.contains,
'between': lambda val, pair: val >= pair[0] and val <= pair[1],
}
def __call__(self, engine=None):
if engine is None:
engine = config.db
if "+" in self.db:
dialect, driver = self.db.split('+')
else:
dialect, driver = self.db, None
if dialect and engine.name != dialect:
return False
if driver is not None and engine.driver != driver:
return False
if self.op is not None:
assert driver is None, "DBAPI version specs not supported yet"
version = _server_version(engine)
oper = hasattr(self.op, '__call__') and self.op \
or self._ops[self.op]
return oper(version, self.spec)
else:
return True
def _as_string(self, negate=False):
if self.description is not None:
return self.description
elif self.op is None:
if negate:
return "not %s" % self.db
else:
return "%s" % self.db
else:
if negate:
return "not %s %s %s" % (
self.db,
self.op,
self.spec
)
else:
return "%s %s %s" % (
self.db,
self.op,
self.spec
)
def __str__(self):
return self._as_string()
class LambdaPredicate(Predicate):
def __init__(self, lambda_, description=None, args=None, kw=None):
self.lambda_ = lambda_
self.args = args or ()
self.kw = kw or {}
if description:
self.description = description
elif lambda_.__doc__:
self.description = lambda_.__doc__
else:
self.description = "custom function"
def __call__(self):
return self.lambda_(*self.args, **self.kw)
def _as_string(self, negate=False):
if negate:
return "not " + self.description
else:
return self.description
def __str__(self):
return self._as_string()
class NotPredicate(Predicate):
def __init__(self, predicate):
self.predicate = predicate
def __call__(self, *arg, **kw):
return not self.predicate(*arg, **kw)
def __str__(self):
return self.predicate._as_string(True)
class OrPredicate(Predicate):
def __init__(self, predicates, description=None):
self.predicates = predicates
self.description = description
def __call__(self, *arg, **kw):
for pred in self.predicates:
if pred(*arg, **kw):
self._str = pred
return True
return False
_str = None
def _eval_str(self, negate=False):
if self._str is None:
if negate:
conjunction = " and "
else:
conjunction = " or "
return conjunction.join(p._as_string(negate=negate)
for p in self.predicates)
else:
return self._str._as_string(negate=negate)
def _negation_str(self):
if self.description is not None:
return "Not " + (self.description % {"spec": self._str})
else:
return self._eval_str(negate=True)
def _as_string(self, negate=False):
if negate:
return self._negation_str()
else:
if self.description is not None:
return self.description % {"spec": self._str}
else:
return self._eval_str()
def __str__(self):
return self._as_string()
_as_predicate = Predicate.as_predicate
def _is_excluded(db, op, spec):
return SpecPredicate(db, op, spec)()
def _server_version(engine):
"""Return a server_version_info tuple."""
# force metadata to be retrieved
conn = engine.connect()
version = getattr(engine.dialect, 'server_version_info', ())
conn.close()
return version
def db_spec(*dbs):
return OrPredicate(
Predicate.as_predicate(db) for db in dbs
)
def open():
return skip_if(BooleanPredicate(False, "mark as execute"))
def closed():
return skip_if(BooleanPredicate(True, "marked as skip"))
@decorator
def future(fn, *args, **kw):
return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
def fails_on(db, reason=None):
return fails_if(SpecPredicate(db), reason)
def fails_on_everything_except(*dbs):
return succeeds_if(
OrPredicate([
SpecPredicate(db) for db in dbs
])
)
def skip(db, reason=None):
return skip_if(SpecPredicate(db), reason)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
)
def exclude(db, op, spec, reason=None):
return skip_if(SpecPredicate(db, op, spec), reason)
def against(*queries):
return OrPredicate([
Predicate.as_predicate(query)
for query in queries
])()