# orm/properties.py
# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""MapperProperty implementations.

This is a private module which defines the behavior of invidual ORM-
mapped attributes.

"""

from .. import sql, util, log, exc as sa_exc, inspect
from ..sql import operators, expression
from . import (
    attributes, mapper,
    strategies, configure_mappers, relationships,
    dependency
    )
from .util import CascadeOptions, \
        _orm_annotate, _orm_deannotate, _orm_full_deannotate

from .interfaces import MANYTOMANY, MANYTOONE, ONETOMANY,\
        PropComparator, StrategizedProperty

mapperlib = util.importlater("sqlalchemy.orm", "mapperlib")
NoneType = type(None)

from descriptor_props import CompositeProperty, SynonymProperty, \
            ComparableProperty, ConcreteInheritedProperty

__all__ = ['ColumnProperty', 'CompositeProperty', 'SynonymProperty',
           'ComparableProperty', 'RelationshipProperty', 'RelationProperty']


class ColumnProperty(StrategizedProperty):
    """Describes an object attribute that corresponds to a table column.

    Public constructor is the :func:`.orm.column_property` function.

    """

    def __init__(self, *columns, **kwargs):
        """Construct a ColumnProperty.

        Note the public constructor is the :func:`.orm.column_property`
        function.

        :param \*columns: The list of `columns` describes a single
          object property. If there are multiple tables joined
          together for the mapper, this list represents the equivalent
          column as it appears across each table.

        :param group:

        :param deferred:

        :param comparator_factory:

        :param descriptor:

        :param expire_on_flush:

        :param extension:

        :param info: Optional data dictionary which will be populated into the
         :attr:`.info` attribute of this object.

        """
        self._orig_columns = [expression._labeled(c) for c in columns]
        self.columns = [expression._labeled(_orm_full_deannotate(c))
                            for c in columns]
        self.group = kwargs.pop('group', None)
        self.deferred = kwargs.pop('deferred', False)
        self.instrument = kwargs.pop('_instrument', True)
        self.comparator_factory = kwargs.pop('comparator_factory',
                                            self.__class__.Comparator)
        self.descriptor = kwargs.pop('descriptor', None)
        self.extension = kwargs.pop('extension', None)
        self.active_history = kwargs.pop('active_history', False)
        self.expire_on_flush = kwargs.pop('expire_on_flush', True)

        if 'info' in kwargs:
            self.info = kwargs.pop('info')

        if 'doc' in kwargs:
            self.doc = kwargs.pop('doc')
        else:
            for col in reversed(self.columns):
                doc = getattr(col, 'doc', None)
                if doc is not None:
                    self.doc = doc
                    break
            else:
                self.doc = None

        if kwargs:
            raise TypeError(
                "%s received unexpected keyword argument(s): %s" % (
                    self.__class__.__name__,
                    ', '.join(sorted(kwargs.keys()))))

        util.set_creation_order(self)
        if not self.instrument:
            self.strategy_class = strategies.UninstrumentedColumnLoader
        elif self.deferred:
            self.strategy_class = strategies.DeferredColumnLoader
        else:
            self.strategy_class = strategies.ColumnLoader

    @property
    def expression(self):
        """Return the primary column or expression for this ColumnProperty.

        """
        return self.columns[0]

    def instrument_class(self, mapper):
        if not self.instrument:
            return

        attributes.register_descriptor(
            mapper.class_,
            self.key,
            comparator=self.comparator_factory(self, mapper),
            parententity=mapper,
            doc=self.doc
            )

    def do_init(self):
        super(ColumnProperty, self).do_init()
        if len(self.columns) > 1 and \
                set(self.parent.primary_key).issuperset(self.columns):
            util.warn(
                ("On mapper %s, primary key column '%s' is being combined "
                 "with distinct primary key column '%s' in attribute '%s'.  "
                 "Use explicit properties to give each column its own mapped "
                 "attribute name.") % (self.parent, self.columns[1],
                                       self.columns[0], self.key))

    def copy(self):
        return ColumnProperty(
                        deferred=self.deferred,
                        group=self.group,
                        active_history=self.active_history,
                        *self.columns)

    def _getcommitted(self, state, dict_, column,
                    passive=attributes.PASSIVE_OFF):
        return state.get_impl(self.key).\
                    get_committed_value(state, dict_, passive=passive)

    def merge(self, session, source_state, source_dict, dest_state,
                                dest_dict, load, _recursive):
        if not self.instrument:
            return
        elif self.key in source_dict:
            value = source_dict[self.key]

            if not load:
                dest_dict[self.key] = value
            else:
                impl = dest_state.get_impl(self.key)
                impl.set(dest_state, dest_dict, value, None)
        elif dest_state.has_identity and self.key not in dest_dict:
            dest_state._expire_attributes(dest_dict, [self.key])

    class Comparator(PropComparator):
        """Produce boolean, comparison, and other operators for
        :class:`.ColumnProperty` attributes.

        See the documentation for :class:`.PropComparator` for a brief
        overview.

        See also:

        :class:`.PropComparator`

        :class:`.ColumnOperators`

        :ref:`types_operators`

        :attr:`.TypeEngine.comparator_factory`

        """
        @util.memoized_instancemethod
        def __clause_element__(self):
            if self.adapter:
                return self.adapter(self.prop.columns[0])
            else:
                return self.prop.columns[0]._annotate({
                    "parententity": self._parentmapper,
                    "parentmapper": self._parentmapper})

        @util.memoized_property
        def info(self):
            ce = self.__clause_element__()
            try:
                return ce.info
            except AttributeError:
                return self.prop.info

        def __getattr__(self, key):
            """proxy attribute access down to the mapped column.

            this allows user-defined comparison methods to be accessed.
            """
            return getattr(self.__clause_element__(), key)

        def operate(self, op, *other, **kwargs):
            return op(self.__clause_element__(), *other, **kwargs)

        def reverse_operate(self, op, other, **kwargs):
            col = self.__clause_element__()
            return op(col._bind_param(op, other), col, **kwargs)

    # TODO: legacy..do we need this ? (0.5)
    ColumnComparator = Comparator

    def __str__(self):
        return str(self.parent.class_.__name__) + "." + self.key

log.class_logger(ColumnProperty)


class RelationshipProperty(StrategizedProperty):
    """Describes an object property that holds a single item or list
    of items that correspond to a related database table.

    Public constructor is the :func:`.orm.relationship` function.

    See also:

    :ref:`relationship_config_toplevel`

    """

    strategy_wildcard_key = 'relationship:*'

    _dependency_processor = None

    def __init__(self, argument,
        secondary=None, primaryjoin=None,
        secondaryjoin=None,
        foreign_keys=None,
        uselist=None,
        order_by=False,
        backref=None,
        back_populates=None,
        post_update=False,
        cascade=False, extension=None,
        viewonly=False, lazy=True,
        collection_class=None, passive_deletes=False,
        passive_updates=True, remote_side=None,
        enable_typechecks=True, join_depth=None,
        comparator_factory=None,
        single_parent=False, innerjoin=False,
        distinct_target_key=False,
        doc=None,
        active_history=False,
        cascade_backrefs=True,
        load_on_pending=False,
        strategy_class=None, _local_remote_pairs=None,
        query_class=None,
            info=None):

        self.uselist = uselist
        self.argument = argument
        self.secondary = secondary
        self.primaryjoin = primaryjoin
        self.secondaryjoin = secondaryjoin
        self.post_update = post_update
        self.direction = None
        self.viewonly = viewonly
        self.lazy = lazy
        self.single_parent = single_parent
        self._user_defined_foreign_keys = foreign_keys
        self.collection_class = collection_class
        self.passive_deletes = passive_deletes
        self.cascade_backrefs = cascade_backrefs
        self.passive_updates = passive_updates
        self.remote_side = remote_side
        self.enable_typechecks = enable_typechecks
        self.query_class = query_class
        self.innerjoin = innerjoin
        self.distinct_target_key = distinct_target_key
        self.doc = doc
        self.active_history = active_history
        self.join_depth = join_depth
        self.local_remote_pairs = _local_remote_pairs
        self.extension = extension
        self.load_on_pending = load_on_pending
        self.comparator_factory = comparator_factory or \
                                    RelationshipProperty.Comparator
        self.comparator = self.comparator_factory(self, None)
        util.set_creation_order(self)

        if info is not None:
            self.info = info

        if strategy_class:
            self.strategy_class = strategy_class
        elif self.lazy == 'dynamic':
            from sqlalchemy.orm import dynamic
            self.strategy_class = dynamic.DynaLoader
        else:
            self.strategy_class = strategies.factory(self.lazy)

        self._reverse_property = set()

        self.cascade = cascade if cascade is not False \
                            else "save-update, merge"

        self.order_by = order_by

        self.back_populates = back_populates

        if self.back_populates:
            if backref:
                raise sa_exc.ArgumentError(
                            "backref and back_populates keyword arguments "
                            "are mutually exclusive")
            self.backref = None
        else:
            self.backref = backref

    def instrument_class(self, mapper):
        attributes.register_descriptor(
            mapper.class_,
            self.key,
            comparator=self.comparator_factory(self, mapper),
            parententity=mapper,
            doc=self.doc,
            )

    class Comparator(PropComparator):
        """Produce boolean, comparison, and other operators for
        :class:`.RelationshipProperty` attributes.

        See the documentation for :class:`.PropComparator` for a brief overview
        of ORM level operator definition.

        See also:

        :class:`.PropComparator`

        :class:`.ColumnProperty.Comparator`

        :class:`.ColumnOperators`

        :ref:`types_operators`

        :attr:`.TypeEngine.comparator_factory`

        """

        _of_type = None

        def __init__(self, prop, parentmapper, of_type=None, adapter=None):
            """Construction of :class:`.RelationshipProperty.Comparator`
            is internal to the ORM's attribute mechanics.

            """
            self.prop = prop
            self._parentmapper = parentmapper
            self.adapter = adapter
            if of_type:
                self._of_type = of_type

        def adapted(self, adapter):
            """Return a copy of this PropComparator which will use the
            given adaption function on the local side of generated
            expressions.

            """

            return self.__class__(self.property, self._parentmapper,
                                  getattr(self, '_of_type', None),
                                  adapter)

        @util.memoized_property
        def mapper(self):
            """The target :class:`.Mapper` referred to by this
            :class:`.RelationshipProperty.Comparator.

            This is the "target" or "remote" side of the
            :func:`.relationship`.

            """
            return self.property.mapper

        @util.memoized_property
        def _parententity(self):
            return self.property.parent

        def _source_selectable(self):
            elem = self.property.parent._with_polymorphic_selectable
            if self.adapter:
                return self.adapter(elem)
            else:
                return elem

        def __clause_element__(self):
            adapt_from = self._source_selectable()
            if self._of_type:
                of_type = inspect(self._of_type).mapper
            else:
                of_type = None

            pj, sj, source, dest, \
            secondary, target_adapter = self.property._create_joins(
                            source_selectable=adapt_from,
                            source_polymorphic=True,
                            of_type=of_type)
            if sj is not None:
                return pj & sj
            else:
                return pj

        def of_type(self, cls):
            """Produce a construct that represents a particular 'subtype' of
            attribute for the parent class.

            Currently this is usable in conjunction with :meth:`.Query.join`
            and :meth:`.Query.outerjoin`.

            """
            return RelationshipProperty.Comparator(
                                        self.property,
                                        self._parentmapper,
                                        cls, adapter=self.adapter)

        def in_(self, other):
            """Produce an IN clause - this is not implemented
            for :func:`~.orm.relationship`-based attributes at this time.

            """
            raise NotImplementedError('in_() not yet supported for '
                    'relationships.  For a simple many-to-one, use '
                    'in_() against the set of foreign key values.')

        __hash__ = None

        def __eq__(self, other):
            """Implement the ``==`` operator.

            In a many-to-one context, such as::

              MyClass.some_prop == <some object>

            this will typically produce a
            clause such as::

              mytable.related_id == <some id>

            Where ``<some id>`` is the primary key of the given
            object.

            The ``==`` operator provides partial functionality for non-
            many-to-one comparisons:

            * Comparisons against collections are not supported.
              Use :meth:`~.RelationshipProperty.Comparator.contains`.
            * Compared to a scalar one-to-many, will produce a
              clause that compares the target columns in the parent to
              the given target.
            * Compared to a scalar many-to-many, an alias
              of the association table will be rendered as
              well, forming a natural join that is part of the
              main body of the query. This will not work for
              queries that go beyond simple AND conjunctions of
              comparisons, such as those which use OR. Use
              explicit joins, outerjoins, or
              :meth:`~.RelationshipProperty.Comparator.has` for
              more comprehensive non-many-to-one scalar
              membership tests.
            * Comparisons against ``None`` given in a one-to-many
              or many-to-many context produce a NOT EXISTS clause.

            """
            if isinstance(other, (NoneType, expression.Null)):
                if self.property.direction in [ONETOMANY, MANYTOMANY]:
                    return ~self._criterion_exists()
                else:
                    return _orm_annotate(self.property._optimized_compare(
                            None, adapt_source=self.adapter))
            elif self.property.uselist:
                raise sa_exc.InvalidRequestError("Can't compare a colle"
                        "ction to an object or collection; use "
                        "contains() to test for membership.")
            else:
                return _orm_annotate(self.property._optimized_compare(other,
                        adapt_source=self.adapter))

        def _criterion_exists(self, criterion=None, **kwargs):
            if getattr(self, '_of_type', None):
                info = inspect(self._of_type)
                target_mapper, to_selectable, is_aliased_class = \
                    info.mapper, info.selectable, info.is_aliased_class
                if self.property._is_self_referential and not is_aliased_class:
                    to_selectable = to_selectable.alias()

                single_crit = target_mapper._single_table_criterion
                if single_crit is not None:
                    if criterion is not None:
                        criterion = single_crit & criterion
                    else:
                        criterion = single_crit
            else:
                is_aliased_class = False
                to_selectable = None

            if self.adapter:
                source_selectable = self._source_selectable()
            else:
                source_selectable = None

            pj, sj, source, dest, secondary, target_adapter = \
                self.property._create_joins(dest_polymorphic=True,
                        dest_selectable=to_selectable,
                        source_selectable=source_selectable)

            for k in kwargs:
                crit = getattr(self.property.mapper.class_, k) == kwargs[k]
                if criterion is None:
                    criterion = crit
                else:
                    criterion = criterion & crit

            # annotate the *local* side of the join condition, in the case
            # of pj + sj this is the full primaryjoin, in the case of just
            # pj its the local side of the primaryjoin.
            if sj is not None:
                j = _orm_annotate(pj) & sj
            else:
                j = _orm_annotate(pj, exclude=self.property.remote_side)

            if criterion is not None and target_adapter and not is_aliased_class:
                # limit this adapter to annotated only?
                criterion = target_adapter.traverse(criterion)

            # only have the "joined left side" of what we
            # return be subject to Query adaption.  The right
            # side of it is used for an exists() subquery and
            # should not correlate or otherwise reach out
            # to anything in the enclosing query.
            if criterion is not None:
                criterion = criterion._annotate(
                    {'no_replacement_traverse': True})

            crit = j & criterion

            ex = sql.exists([1], crit, from_obj=dest).correlate_except(dest)
            if secondary is not None:
                ex = ex.correlate_except(secondary)
            return ex

        def any(self, criterion=None, **kwargs):
            """Produce an expression that tests a collection against
            particular criterion, using EXISTS.

            An expression like::

                session.query(MyClass).filter(
                    MyClass.somereference.any(SomeRelated.x==2)
                )


            Will produce a query like::

                SELECT * FROM my_table WHERE
                EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
                AND related.x=2)

            Because :meth:`~.RelationshipProperty.Comparator.any` uses
            a correlated subquery, its performance is not nearly as
            good when compared against large target tables as that of
            using a join.

            :meth:`~.RelationshipProperty.Comparator.any` is particularly
            useful for testing for empty collections::

                session.query(MyClass).filter(
                    ~MyClass.somereference.any()
                )

            will produce::

                SELECT * FROM my_table WHERE
                NOT EXISTS (SELECT 1 FROM related WHERE
                related.my_id=my_table.id)

            :meth:`~.RelationshipProperty.Comparator.any` is only
            valid for collections, i.e. a :func:`.relationship`
            that has ``uselist=True``.  For scalar references,
            use :meth:`~.RelationshipProperty.Comparator.has`.

            """
            if not self.property.uselist:
                raise sa_exc.InvalidRequestError(
                            "'any()' not implemented for scalar "
                            "attributes. Use has()."
                        )

            return self._criterion_exists(criterion, **kwargs)

        def has(self, criterion=None, **kwargs):
            """Produce an expression that tests a scalar reference against
            particular criterion, using EXISTS.

            An expression like::

                session.query(MyClass).filter(
                    MyClass.somereference.has(SomeRelated.x==2)
                )


            Will produce a query like::

                SELECT * FROM my_table WHERE
                EXISTS (SELECT 1 FROM related WHERE
                related.id==my_table.related_id AND related.x=2)

            Because :meth:`~.RelationshipProperty.Comparator.has` uses
            a correlated subquery, its performance is not nearly as
            good when compared against large target tables as that of
            using a join.

            :meth:`~.RelationshipProperty.Comparator.has` is only
            valid for scalar references, i.e. a :func:`.relationship`
            that has ``uselist=False``.  For collection references,
            use :meth:`~.RelationshipProperty.Comparator.any`.

            """
            if self.property.uselist:
                raise sa_exc.InvalidRequestError(
                            "'has()' not implemented for collections.  "
                            "Use any().")
            return self._criterion_exists(criterion, **kwargs)

        def contains(self, other, **kwargs):
            """Return a simple expression that tests a collection for
            containment of a particular item.

            :meth:`~.RelationshipProperty.Comparator.contains` is
            only valid for a collection, i.e. a
            :func:`~.orm.relationship` that implements
            one-to-many or many-to-many with ``uselist=True``.

            When used in a simple one-to-many context, an
            expression like::

                MyClass.contains(other)

            Produces a clause like::

                mytable.id == <some id>

            Where ``<some id>`` is the value of the foreign key
            attribute on ``other`` which refers to the primary
            key of its parent object. From this it follows that
            :meth:`~.RelationshipProperty.Comparator.contains` is
            very useful when used with simple one-to-many
            operations.

            For many-to-many operations, the behavior of
            :meth:`~.RelationshipProperty.Comparator.contains`
            has more caveats. The association table will be
            rendered in the statement, producing an "implicit"
            join, that is, includes multiple tables in the FROM
            clause which are equated in the WHERE clause::

                query(MyClass).filter(MyClass.contains(other))

            Produces a query like::

                SELECT * FROM my_table, my_association_table AS
                my_association_table_1 WHERE
                my_table.id = my_association_table_1.parent_id
                AND my_association_table_1.child_id = <some id>

            Where ``<some id>`` would be the primary key of
            ``other``. From the above, it is clear that
            :meth:`~.RelationshipProperty.Comparator.contains`
            will **not** work with many-to-many collections when
            used in queries that move beyond simple AND
            conjunctions, such as multiple
            :meth:`~.RelationshipProperty.Comparator.contains`
            expressions joined by OR. In such cases subqueries or
            explicit "outer joins" will need to be used instead.
            See :meth:`~.RelationshipProperty.Comparator.any` for
            a less-performant alternative using EXISTS, or refer
            to :meth:`.Query.outerjoin` as well as :ref:`ormtutorial_joins`
            for more details on constructing outer joins.

            """
            if not self.property.uselist:
                raise sa_exc.InvalidRequestError(
                            "'contains' not implemented for scalar "
                            "attributes.  Use ==")
            clause = self.property._optimized_compare(other,
                    adapt_source=self.adapter)

            if self.property.secondaryjoin is not None:
                clause.negation_clause = \
                    self.__negated_contains_or_equals(other)

            return clause

        def __negated_contains_or_equals(self, other):
            if self.property.direction == MANYTOONE:
                state = attributes.instance_state(other)

                def state_bindparam(x, state, col):
                    o = state.obj()  # strong ref
                    return sql.bindparam(x, unique=True, callable_=lambda: \
                        self.property.mapper._get_committed_attr_by_column(o, col))

                def adapt(col):
                    if self.adapter:
                        return self.adapter(col)
                    else:
                        return col

                if self.property._use_get:
                    return sql.and_(*[
                        sql.or_(
                            adapt(x) != state_bindparam(adapt(x), state, y),
                            adapt(x) == None)
                        for (x, y) in self.property.local_remote_pairs])

            criterion = sql.and_(*[x == y for (x, y) in
                                zip(
                                    self.property.mapper.primary_key,
                                    self.property.\
                                            mapper.\
                                            primary_key_from_instance(other))
                                    ])
            return ~self._criterion_exists(criterion)

        def __ne__(self, other):
            """Implement the ``!=`` operator.

            In a many-to-one context, such as::

              MyClass.some_prop != <some object>

            This will typically produce a clause such as::

              mytable.related_id != <some id>

            Where ``<some id>`` is the primary key of the
            given object.

            The ``!=`` operator provides partial functionality for non-
            many-to-one comparisons:

            * Comparisons against collections are not supported.
              Use
              :meth:`~.RelationshipProperty.Comparator.contains`
              in conjunction with :func:`~.expression.not_`.
            * Compared to a scalar one-to-many, will produce a
              clause that compares the target columns in the parent to
              the given target.
            * Compared to a scalar many-to-many, an alias
              of the association table will be rendered as
              well, forming a natural join that is part of the
              main body of the query. This will not work for
              queries that go beyond simple AND conjunctions of
              comparisons, such as those which use OR. Use
              explicit joins, outerjoins, or
              :meth:`~.RelationshipProperty.Comparator.has` in
              conjunction with :func:`~.expression.not_` for
              more comprehensive non-many-to-one scalar
              membership tests.
            * Comparisons against ``None`` given in a one-to-many
              or many-to-many context produce an EXISTS clause.

            """
            if isinstance(other, (NoneType, expression.Null)):
                if self.property.direction == MANYTOONE:
                    return sql.or_(*[x != None for x in
                                   self.property._calculated_foreign_keys])
                else:
                    return self._criterion_exists()
            elif self.property.uselist:
                raise sa_exc.InvalidRequestError("Can't compare a collection"
                        " to an object or collection; use "
                        "contains() to test for membership.")
            else:
                return self.__negated_contains_or_equals(other)

        @util.memoized_property
        def property(self):
            if mapperlib.module._new_mappers:
                configure_mappers()
            return self.prop

    def compare(self, op, value,
                            value_is_parent=False,
                            alias_secondary=True):
        if op == operators.eq:
            if value is None:
                if self.uselist:
                    return ~sql.exists([1], self.primaryjoin)
                else:
                    return self._optimized_compare(None,
                                    value_is_parent=value_is_parent,
                                    alias_secondary=alias_secondary)
            else:
                return self._optimized_compare(value,
                                value_is_parent=value_is_parent,
                                alias_secondary=alias_secondary)
        else:
            return op(self.comparator, value)

    def _optimized_compare(self, value, value_is_parent=False,
                                    adapt_source=None,
                                    alias_secondary=True):
        if value is not None:
            value = attributes.instance_state(value)
        return self._get_strategy(strategies.LazyLoader).lazy_clause(value,
                reverse_direction=not value_is_parent,
                alias_secondary=alias_secondary,
                adapt_source=adapt_source)

    def __str__(self):
        return str(self.parent.class_.__name__) + "." + self.key

    def merge(self,
                    session,
                    source_state,
                    source_dict,
                    dest_state,
                    dest_dict,
                    load, _recursive):

        if load:
            for r in self._reverse_property:
                if (source_state, r) in _recursive:
                    return

        if not "merge" in self._cascade:
            return

        if self.key not in source_dict:
            return

        if self.uselist:
            instances = source_state.get_impl(self.key).\
                            get(source_state, source_dict)
            if hasattr(instances, '_sa_adapter'):
                # convert collections to adapters to get a true iterator
                instances = instances._sa_adapter

            if load:
                # for a full merge, pre-load the destination collection,
                # so that individual _merge of each item pulls from identity
                # map for those already present.
                # also assumes CollectionAttrbiuteImpl behavior of loading
                # "old" list in any case
                dest_state.get_impl(self.key).get(dest_state, dest_dict)

            dest_list = []
            for current in instances:
                current_state = attributes.instance_state(current)
                current_dict = attributes.instance_dict(current)
                _recursive[(current_state, self)] = True
                obj = session._merge(current_state, current_dict,
                        load=load, _recursive=_recursive)
                if obj is not None:
                    dest_list.append(obj)

            if not load:
                coll = attributes.init_state_collection(dest_state,
                        dest_dict, self.key)
                for c in dest_list:
                    coll.append_without_event(c)
            else:
                dest_state.get_impl(self.key)._set_iterable(dest_state,
                        dest_dict, dest_list)
        else:
            current = source_dict[self.key]
            if current is not None:
                current_state = attributes.instance_state(current)
                current_dict = attributes.instance_dict(current)
                _recursive[(current_state, self)] = True
                obj = session._merge(current_state, current_dict,
                        load=load, _recursive=_recursive)
            else:
                obj = None

            if not load:
                dest_dict[self.key] = obj
            else:
                dest_state.get_impl(self.key).set(dest_state,
                        dest_dict, obj, None)

    def _value_as_iterable(self, state, dict_, key,
                                    passive=attributes.PASSIVE_OFF):
        """Return a list of tuples (state, obj) for the given
        key.

        returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
        """

        impl = state.manager[key].impl
        x = impl.get(state, dict_, passive=passive)
        if x is attributes.PASSIVE_NO_RESULT or x is None:
            return []
        elif hasattr(impl, 'get_collection'):
            return [
                (attributes.instance_state(o), o) for o in
                impl.get_collection(state, dict_, x, passive=passive)
            ]
        else:
            return [(attributes.instance_state(x), x)]

    def cascade_iterator(self, type_, state, dict_,
                         visited_states, halt_on=None):
        #assert type_ in self._cascade

        # only actively lazy load on the 'delete' cascade
        if type_ != 'delete' or self.passive_deletes:
            passive = attributes.PASSIVE_NO_INITIALIZE
        else:
            passive = attributes.PASSIVE_OFF

        if type_ == 'save-update':
            tuples = state.manager[self.key].impl.\
                        get_all_pending(state, dict_)

        else:
            tuples = self._value_as_iterable(state, dict_, self.key,
                            passive=passive)

        skip_pending = type_ == 'refresh-expire' and 'delete-orphan' \
            not in self._cascade

        for instance_state, c in tuples:
            if instance_state in visited_states:
                continue

            if c is None:
                # would like to emit a warning here, but
                # would not be consistent with collection.append(None)
                # current behavior of silently skipping.
                # see [ticket:2229]
                continue

            instance_dict = attributes.instance_dict(c)

            if halt_on and halt_on(instance_state):
                continue

            if skip_pending and not instance_state.key:
                continue

            instance_mapper = instance_state.manager.mapper

            if not instance_mapper.isa(self.mapper.class_manager.mapper):
                raise AssertionError("Attribute '%s' on class '%s' "
                                    "doesn't handle objects "
                                    "of type '%s'" % (
                                        self.key,
                                        self.parent.class_,
                                        c.__class__
                                    ))

            visited_states.add(instance_state)

            yield c, instance_mapper, instance_state, instance_dict

    def _add_reverse_property(self, key):
        other = self.mapper.get_property(key, _configure_mappers=False)
        self._reverse_property.add(other)
        other._reverse_property.add(self)

        if not other.mapper.common_parent(self.parent):
            raise sa_exc.ArgumentError('reverse_property %r on '
                    'relationship %s references relationship %s, which '
                    'does not reference mapper %s' % (key, self, other,
                    self.parent))
        if self.direction in (ONETOMANY, MANYTOONE) and self.direction \
                        == other.direction:
            raise sa_exc.ArgumentError('%s and back-reference %s are '
                    'both of the same direction %r.  Did you mean to '
                    'set remote_side on the many-to-one side ?'
                    % (other, self, self.direction))

    @util.memoized_property
    def mapper(self):
        """Return the targeted :class:`.Mapper` for this
        :class:`.RelationshipProperty`.

        This is a lazy-initializing static attribute.

        """
        if isinstance(self.argument, type):
            mapper_ = mapper.class_mapper(self.argument,
                    configure=False)
        elif isinstance(self.argument, mapper.Mapper):
            mapper_ = self.argument
        elif util.callable(self.argument):

            # accept a callable to suit various deferred-
            # configurational schemes

            mapper_ = mapper.class_mapper(self.argument(),
                    configure=False)
        else:
            raise sa_exc.ArgumentError("relationship '%s' expects "
                    "a class or a mapper argument (received: %s)"
                    % (self.key, type(self.argument)))
        assert isinstance(mapper_, mapper.Mapper), mapper_
        return mapper_

    @util.memoized_property
    @util.deprecated("0.7", "Use .target")
    def table(self):
        """Return the selectable linked to this
        :class:`.RelationshipProperty` object's target
        :class:`.Mapper`."""
        return self.target

    def do_init(self):
        self._check_conflicts()
        self._process_dependent_arguments()
        self._setup_join_conditions()
        self._check_cascade_settings(self._cascade)
        self._post_init()
        self._generate_backref()
        super(RelationshipProperty, self).do_init()

    def _process_dependent_arguments(self):
        """Convert incoming configuration arguments to their
        proper form.

        Callables are resolved, ORM annotations removed.

        """
        # accept callables for other attributes which may require
        # deferred initialization.  This technique is used
        # by declarative "string configs" and some recipes.
        for attr in (
            'order_by', 'primaryjoin', 'secondaryjoin',
            'secondary', '_user_defined_foreign_keys', 'remote_side',
                ):
            attr_value = getattr(self, attr)
            if util.callable(attr_value):
                setattr(self, attr, attr_value())

        # remove "annotations" which are present if mapped class
        # descriptors are used to create the join expression.
        for attr in 'primaryjoin', 'secondaryjoin':
            val = getattr(self, attr)
            if val is not None:
                setattr(self, attr, _orm_deannotate(
                    expression._only_column_elements(val, attr))
                )

        # ensure expressions in self.order_by, foreign_keys,
        # remote_side are all columns, not strings.
        if self.order_by is not False and self.order_by is not None:
            self.order_by = [
                    expression._only_column_elements(x, "order_by")
                    for x in
                    util.to_list(self.order_by)]

        self._user_defined_foreign_keys = \
            util.column_set(
                    expression._only_column_elements(x, "foreign_keys")
                    for x in util.to_column_set(
                        self._user_defined_foreign_keys
                    ))

        self.remote_side = \
            util.column_set(
                    expression._only_column_elements(x, "remote_side")
                    for x in
                    util.to_column_set(self.remote_side))

        self.target = self.mapper.mapped_table


    def _setup_join_conditions(self):
        self._join_condition = jc = relationships.JoinCondition(
                    parent_selectable=self.parent.mapped_table,
                    child_selectable=self.mapper.mapped_table,
                    parent_local_selectable=self.parent.local_table,
                    child_local_selectable=self.mapper.local_table,
                    primaryjoin=self.primaryjoin,
                    secondary=self.secondary,
                    secondaryjoin=self.secondaryjoin,
                    parent_equivalents=self.parent._equivalent_columns,
                    child_equivalents=self.mapper._equivalent_columns,
                    consider_as_foreign_keys=self._user_defined_foreign_keys,
                    local_remote_pairs=self.local_remote_pairs,
                    remote_side=self.remote_side,
                    self_referential=self._is_self_referential,
                    prop=self,
                    support_sync=not self.viewonly,
                    can_be_synced_fn=self._columns_are_mapped
        )
        self.primaryjoin = jc.deannotated_primaryjoin
        self.secondaryjoin = jc.deannotated_secondaryjoin
        self.direction = jc.direction
        self.local_remote_pairs = jc.local_remote_pairs
        self.remote_side = jc.remote_columns
        self.local_columns = jc.local_columns
        self.synchronize_pairs = jc.synchronize_pairs
        self._calculated_foreign_keys = jc.foreign_key_columns
        self.secondary_synchronize_pairs = jc.secondary_synchronize_pairs

    def _check_conflicts(self):
        """Test that this relationship is legal, warn about
        inheritance conflicts."""

        if not self.is_primary() \
            and not mapper.class_mapper(
                                self.parent.class_,
                                configure=False).has_property(self.key):
            raise sa_exc.ArgumentError("Attempting to assign a new "
                    "relationship '%s' to a non-primary mapper on "
                    "class '%s'.  New relationships can only be added "
                    "to the primary mapper, i.e. the very first mapper "
                    "created for class '%s' " % (self.key,
                    self.parent.class_.__name__,
                    self.parent.class_.__name__))

        # check for conflicting relationship() on superclass
        if not self.parent.concrete:
            for inheriting in self.parent.iterate_to_root():
                if inheriting is not self.parent \
                        and inheriting.has_property(self.key):
                    util.warn("Warning: relationship '%s' on mapper "
                              "'%s' supersedes the same relationship "
                              "on inherited mapper '%s'; this can "
                              "cause dependency issues during flush"
                              % (self.key, self.parent, inheriting))

    def _get_cascade(self):
        """Return the current cascade setting for this
        :class:`.RelationshipProperty`.
        """
        return self._cascade

    def _set_cascade(self, cascade):
        cascade = CascadeOptions(cascade)
        if 'mapper' in self.__dict__:
            self._check_cascade_settings(cascade)
        self._cascade = cascade

        if self._dependency_processor:
            self._dependency_processor.cascade = cascade

    cascade = property(_get_cascade, _set_cascade)

    def _check_cascade_settings(self, cascade):
        if cascade.delete_orphan and not self.single_parent \
            and (self.direction is MANYTOMANY or self.direction
                 is MANYTOONE):
            raise sa_exc.ArgumentError(
                    'On %s, delete-orphan cascade is not supported '
                    'on a many-to-many or many-to-one relationship '
                    'when single_parent is not set.   Set '
                    'single_parent=True on the relationship().'
                    % self)
        if self.direction is MANYTOONE and self.passive_deletes:
            util.warn("On %s, 'passive_deletes' is normally configured "
                      "on one-to-many, one-to-one, many-to-many "
                      "relationships only."
                       % self)

        if self.passive_deletes == 'all' and \
                    ("delete" in cascade or
                    "delete-orphan" in cascade):
            raise sa_exc.ArgumentError(
                    "On %s, can't set passive_deletes='all' in conjunction "
                    "with 'delete' or 'delete-orphan' cascade" % self)

        if cascade.delete_orphan:
            self.mapper.primary_mapper()._delete_orphans.append(
                            (self.key, self.parent.class_)
                        )

    def _columns_are_mapped(self, *cols):
        """Return True if all columns in the given collection are
        mapped by the tables referenced by this :class:`.Relationship`.

        """
        for c in cols:
            if self.secondary is not None \
                    and self.secondary.c.contains_column(c):
                continue
            if not self.parent.mapped_table.c.contains_column(c) and \
                    not self.target.c.contains_column(c):
                return False
        return True

    def _generate_backref(self):
        """Interpret the 'backref' instruction to create a
        :func:`.relationship` complementary to this one."""

        if not self.is_primary():
            return
        if self.backref is not None and not self.back_populates:
            if isinstance(self.backref, basestring):
                backref_key, kwargs = self.backref, {}
            else:
                backref_key, kwargs = self.backref
            mapper = self.mapper.primary_mapper()

            check = set(mapper.iterate_to_root()).\
                        union(mapper.self_and_descendants)
            for m in check:
                if m.has_property(backref_key):
                    raise sa_exc.ArgumentError("Error creating backref "
                            "'%s' on relationship '%s': property of that "
                            "name exists on mapper '%s'" % (backref_key,
                            self, m))

            # determine primaryjoin/secondaryjoin for the
            # backref.  Use the one we had, so that
            # a custom join doesn't have to be specified in
            # both directions.
            if self.secondary is not None:
                # for many to many, just switch primaryjoin/
                # secondaryjoin.   use the annotated
                # pj/sj on the _join_condition.
                pj = kwargs.pop('primaryjoin',
                                self._join_condition.secondaryjoin_minus_local)
                sj = kwargs.pop('secondaryjoin',
                                self._join_condition.primaryjoin_minus_local)
            else:
                pj = kwargs.pop('primaryjoin',
                        self._join_condition.primaryjoin_reverse_remote)
                sj = kwargs.pop('secondaryjoin', None)
                if sj:
                    raise sa_exc.InvalidRequestError(
                        "Can't assign 'secondaryjoin' on a backref "
                        "against a non-secondary relationship."
                    )

            foreign_keys = kwargs.pop('foreign_keys',
                    self._user_defined_foreign_keys)
            parent = self.parent.primary_mapper()
            kwargs.setdefault('viewonly', self.viewonly)
            kwargs.setdefault('post_update', self.post_update)
            kwargs.setdefault('passive_updates', self.passive_updates)
            self.back_populates = backref_key
            relationship = RelationshipProperty(
                parent, self.secondary,
                pj, sj,
                foreign_keys=foreign_keys,
                back_populates=self.key,
                **kwargs)
            mapper._configure_property(backref_key, relationship)

        if self.back_populates:
            self._add_reverse_property(self.back_populates)

    def _post_init(self):
        if self.uselist is None:
            self.uselist = self.direction is not MANYTOONE
        if not self.viewonly:
            self._dependency_processor = \
                dependency.DependencyProcessor.from_relationship(self)

    @util.memoized_property
    def _use_get(self):
        """memoize the 'use_get' attribute of this RelationshipLoader's
        lazyloader."""

        strategy = self._get_strategy(strategies.LazyLoader)
        return strategy.use_get

    @util.memoized_property
    def _is_self_referential(self):
        return self.mapper.common_parent(self.parent)

    def _create_joins(self, source_polymorphic=False,
                            source_selectable=None, dest_polymorphic=False,
                            dest_selectable=None, of_type=None):
        if source_selectable is None:
            if source_polymorphic and self.parent.with_polymorphic:
                source_selectable = self.parent._with_polymorphic_selectable

        aliased = False
        if dest_selectable is None:
            if dest_polymorphic and self.mapper.with_polymorphic:
                dest_selectable = self.mapper._with_polymorphic_selectable
                aliased = True
            else:
                dest_selectable = self.mapper.mapped_table

            if self._is_self_referential and source_selectable is None:
                dest_selectable = dest_selectable.alias()
                aliased = True
        else:
            aliased = True

        dest_mapper = of_type or self.mapper

        single_crit = dest_mapper._single_table_criterion
        aliased = aliased or (source_selectable is not None)

        primaryjoin, secondaryjoin, secondary, target_adapter, dest_selectable = \
            self._join_condition.join_targets(
                source_selectable, dest_selectable, aliased, single_crit
            )
        if source_selectable is None:
            source_selectable = self.parent.local_table
        if dest_selectable is None:
            dest_selectable = self.mapper.local_table
        return (primaryjoin, secondaryjoin, source_selectable,
            dest_selectable, secondary, target_adapter)


PropertyLoader = RelationProperty = RelationshipProperty
log.class_logger(RelationshipProperty)