# orm/query.py
# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""The Query class and support.

Defines the :class:`.Query` class, the central
construct used by the ORM to construct database queries.

The :class:`.Query` class should not be confused with the
:class:`.Select` class, which defines database
SELECT operations at the SQL (non-ORM) level.  ``Query`` differs from
``Select`` in that it returns ORM-mapped objects and interacts with an
ORM session, whereas the ``Select`` construct interacts directly with the
database to return iterable result sets.

"""

from itertools import chain

from . import (
    attributes, interfaces, object_mapper, persistence,
    exc as orm_exc, loading
    )
from .util import (
    AliasedClass, ORMAdapter, _entity_descriptor, PathRegistry,
    _is_aliased_class, _is_mapped_class, _orm_columns,
    join as orm_join, with_parent, aliased
    )
from .. import sql, util, log, exc as sa_exc, inspect, inspection, \
        types as sqltypes
from ..sql.expression import _interpret_as_from
from ..sql import (
        util as sql_util,
        expression, visitors
    )

__all__ = ['Query', 'QueryContext', 'aliased']


def _generative(*assertions):
    """Mark a method as generative."""

    @util.decorator
    def generate(fn, *args, **kw):
        self = args[0]._clone()
        for assertion in assertions:
            assertion(self, fn.func_name)
        fn(self, *args[1:], **kw)
        return self
    return generate

_path_registry = PathRegistry.root


class Query(object):
    """ORM-level SQL construction object.

    :class:`.Query` is the source of all SELECT statements generated by the
    ORM, both those formulated by end-user query operations as well as by
    high level internal operations such as related collection loading.  It
    features a generative interface whereby successive calls return a new
    :class:`.Query` object, a copy of the former with additional
    criteria and options associated with it.

    :class:`.Query` objects are normally initially generated using the
    :meth:`~.Session.query` method of :class:`.Session`.  For a full
    walkthrough of :class:`.Query` usage, see the
    :ref:`ormtutorial_toplevel`.

    """

    _enable_eagerloads = True
    _enable_assertions = True
    _with_labels = False
    _criterion = None
    _yield_per = None
    _lockmode = None
    _order_by = False
    _group_by = False
    _having = None
    _distinct = False
    _prefixes = None
    _offset = None
    _limit = None
    _statement = None
    _correlate = frozenset()
    _populate_existing = False
    _invoke_all_eagers = True
    _version_check = False
    _autoflush = True
    _only_load_props = None
    _refresh_state = None
    _from_obj = ()
    _join_entities = ()
    _select_from_entity = None
    _mapper_adapter_map = {}
    _filter_aliases = None
    _from_obj_alias = None
    _joinpath = _joinpoint = util.immutabledict()
    _execution_options = util.immutabledict()
    _params = util.immutabledict()
    _attributes = util.immutabledict()
    _with_options = ()
    _with_hints = ()
    _enable_single_crit = True

    _current_path = _path_registry

    def __init__(self, entities, session=None):
        self.session = session
        self._polymorphic_adapters = {}
        self._set_entities(entities)

    def _set_entities(self, entities, entity_wrapper=None):
        if entity_wrapper is None:
            entity_wrapper = _QueryEntity
        self._entities = []
        for ent in util.to_list(entities):
            entity_wrapper(self, ent)

        self._set_entity_selectables(self._entities)

    def _set_entity_selectables(self, entities):
        self._mapper_adapter_map = d = self._mapper_adapter_map.copy()

        for ent in entities:
            for entity in ent.entities:
                if entity not in d:
                    ext_info = inspect(entity)
                    if not ext_info.is_aliased_class and \
                        ext_info.mapper.with_polymorphic:
                        if ext_info.mapper.mapped_table not in \
                                            self._polymorphic_adapters:
                            self._mapper_loads_polymorphically_with(
                                ext_info.mapper,
                                sql_util.ColumnAdapter(
                                        ext_info.selectable,
                                        ext_info.mapper._equivalent_columns
                                )
                            )
                        aliased_adapter = None
                    elif ext_info.is_aliased_class:
                        aliased_adapter = sql_util.ColumnAdapter(
                                    ext_info.selectable,
                                    ext_info.mapper._equivalent_columns
                                    )
                    else:
                        aliased_adapter = None

                    d[entity] = (
                        ext_info,
                        aliased_adapter
                    )
                ent.setup_entity(*d[entity])

    def _mapper_loads_polymorphically_with(self, mapper, adapter):
        for m2 in mapper._with_polymorphic_mappers or [mapper]:
            self._polymorphic_adapters[m2] = adapter
            for m in m2.iterate_to_root():
                self._polymorphic_adapters[m.local_table] = adapter

    def _set_select_from(self, obj, set_base_alias):
        fa = []
        select_from_alias = None

        for from_obj in obj:
            info = inspect(from_obj)

            if hasattr(info, 'mapper') and \
                (info.is_mapper or info.is_aliased_class):
                if set_base_alias:
                    raise sa_exc.ArgumentError(
                            "A selectable (FromClause) instance is "
                            "expected when the base alias is being set.")
                self._select_from_entity = from_obj
                fa.append(info.selectable)
            elif not info.is_selectable:
                raise sa_exc.ArgumentError(
                        "argument is not a mapped class, mapper, "
                        "aliased(), or FromClause instance.")
            else:
                if isinstance(from_obj, expression.SelectBase):
                    from_obj = from_obj.alias()
                select_from_alias = from_obj
                fa.append(from_obj)

        self._from_obj = tuple(fa)

        if len(self._from_obj) == 1 and \
            isinstance(select_from_alias, expression.Alias):
            equivs = self.__all_equivs()
            self._from_obj_alias = sql_util.ColumnAdapter(
                                                self._from_obj[0], equivs)


    def _reset_polymorphic_adapter(self, mapper):
        for m2 in mapper._with_polymorphic_mappers:
            self._polymorphic_adapters.pop(m2, None)
            for m in m2.iterate_to_root():
                self._polymorphic_adapters.pop(m.local_table, None)

    def _adapt_polymorphic_element(self, element):
        if "parententity" in element._annotations:
            search = element._annotations['parententity']
            alias = self._polymorphic_adapters.get(search, None)
            if alias:
                return alias.adapt_clause(element)

        if isinstance(element, expression.FromClause):
            search = element
        elif hasattr(element, 'table'):
            search = element.table
        else:
            return None

        alias = self._polymorphic_adapters.get(search, None)
        if alias:
            return alias.adapt_clause(element)

    def _adapt_col_list(self, cols):
        return [
                    self._adapt_clause(
                                expression._literal_as_text(o),
                                True, True)
                    for o in cols
                ]

    @_generative()
    def _adapt_all_clauses(self):
        self._orm_only_adapt = False

    def _adapt_clause(self, clause, as_filter, orm_only):
        """Adapt incoming clauses to transformations which
        have been applied within this query."""

        adapters = []

        # do we adapt all expression elements or only those
        # tagged as 'ORM' constructs ?
        orm_only = getattr(self, '_orm_only_adapt', orm_only)

        if as_filter and self._filter_aliases:
            for fa in self._filter_aliases._visitor_iterator:
                adapters.append(
                    (
                        orm_only, fa.replace
                    )
                )

        if self._from_obj_alias:
            # for the "from obj" alias, apply extra rule to the
            # 'ORM only' check, if this query were generated from a
            # subquery of itself, i.e. _from_selectable(), apply adaption
            # to all SQL constructs.
            adapters.append(
                (
                    getattr(self, '_orm_only_from_obj_alias', orm_only),
                    self._from_obj_alias.replace
                )
            )

        if self._polymorphic_adapters:
            adapters.append(
                (
                    orm_only, self._adapt_polymorphic_element
                )
            )

        if not adapters:
            return clause

        def replace(elem):
            for _orm_only, adapter in adapters:
                # if 'orm only', look for ORM annotations
                # in the element before adapting.
                if not _orm_only or \
                    '_orm_adapt' in elem._annotations or \
                    "parententity" in elem._annotations:

                    e = adapter(elem)
                    if e is not None:
                        return e

        return visitors.replacement_traverse(
                            clause,
                            {},
                            replace
                        )

    def _entity_zero(self):
        return self._entities[0]

    def _mapper_zero(self):
        return self._select_from_entity or \
            self._entity_zero().entity_zero

    @property
    def _mapper_entities(self):
        for ent in self._entities:
            if isinstance(ent, _MapperEntity):
                yield ent

    def _joinpoint_zero(self):
        return self._joinpoint.get(
            '_joinpoint_entity',
            self._mapper_zero()
        )

    def _mapper_zero_or_none(self):
        if not getattr(self._entities[0], 'primary_entity', False):
            return None
        return self._entities[0].mapper

    def _only_mapper_zero(self, rationale=None):
        if len(self._entities) > 1:
            raise sa_exc.InvalidRequestError(
                    rationale or
                    "This operation requires a Query "
                    "against a single mapper."
                )
        return self._mapper_zero()

    def _only_full_mapper_zero(self, methname):
        if len(self._entities) != 1:
            raise sa_exc.InvalidRequestError(
                    "%s() can only be used against "
                    "a single mapped class." % methname)
        entity = self._entity_zero()
        if not hasattr(entity, 'primary_entity'):
            raise sa_exc.InvalidRequestError(
                    "%s() can only be used against "
                    "a single mapped class." % methname)
        return entity.entity_zero

    def _only_entity_zero(self, rationale=None):
        if len(self._entities) > 1:
            raise sa_exc.InvalidRequestError(
                    rationale or
                    "This operation requires a Query "
                    "against a single mapper."
                )
        return self._entity_zero()

    def __all_equivs(self):
        equivs = {}
        for ent in self._mapper_entities:
            equivs.update(ent.mapper._equivalent_columns)
        return equivs

    def _get_condition(self):
        self._order_by = self._distinct = False
        return self._no_criterion_condition("get")

    def _no_criterion_condition(self, meth):
        if not self._enable_assertions:
            return
        if self._criterion is not None or \
                self._statement is not None or self._from_obj or \
                self._limit is not None or self._offset is not None or \
                self._group_by or self._order_by or self._distinct:
            raise sa_exc.InvalidRequestError(
                                "Query.%s() being called on a "
                                "Query with existing criterion. " % meth)

        self._from_obj = ()
        self._statement = self._criterion = None
        self._order_by = self._group_by = self._distinct = False

    def _no_clauseelement_condition(self, meth):
        if not self._enable_assertions:
            return
        if self._order_by:
            raise sa_exc.InvalidRequestError(
                                "Query.%s() being called on a "
                                "Query with existing criterion. " % meth)
        self._no_criterion_condition(meth)

    def _no_statement_condition(self, meth):
        if not self._enable_assertions:
            return
        if self._statement is not None:
            raise sa_exc.InvalidRequestError(
                ("Query.%s() being called on a Query with an existing full "
                 "statement - can't apply criterion.") % meth)

    def _no_limit_offset(self, meth):
        if not self._enable_assertions:
            return
        if self._limit is not None or self._offset is not None:
            raise sa_exc.InvalidRequestError(
                "Query.%s() being called on a Query which already has LIMIT "
                "or OFFSET applied. To modify the row-limited results of a "
                " Query, call from_self() first.  "
                "Otherwise, call %s() before limit() or offset() "
                "are applied."
                % (meth, meth)
            )

    def _no_select_modifiers(self, meth):
        if not self._enable_assertions:
            return
        for attr, methname, notset in (
            ('_limit', 'limit()', None),
            ('_offset', 'offset()', None),
            ('_order_by', 'order_by()', False),
            ('_group_by', 'group_by()', False),
            ('_distinct', 'distinct()', False),
        ):
            if getattr(self, attr) is not notset:
                raise sa_exc.InvalidRequestError(
                    "Can't call Query.%s() when %s has been called" %
                    (meth, methname)
                )

    def _get_options(self, populate_existing=None,
                            version_check=None,
                            only_load_props=None,
                            refresh_state=None):
        if populate_existing:
            self._populate_existing = populate_existing
        if version_check:
            self._version_check = version_check
        if refresh_state:
            self._refresh_state = refresh_state
        if only_load_props:
            self._only_load_props = set(only_load_props)
        return self

    def _clone(self):
        cls = self.__class__
        q = cls.__new__(cls)
        q.__dict__ = self.__dict__.copy()
        return q

    @property
    def statement(self):
        """The full SELECT statement represented by this Query.

        The statement by default will not have disambiguating labels
        applied to the construct unless with_labels(True) is called
        first.

        """

        stmt = self._compile_context(labels=self._with_labels).\
                        statement
        if self._params:
            stmt = stmt.params(self._params)
        # TODO: there's no tests covering effects of
        # the annotation not being there
        return stmt._annotate({'no_replacement_traverse': True})

    def subquery(self, name=None, with_labels=False, reduce_columns=False):
        """return the full SELECT statement represented by
        this :class:`.Query`, embedded within an :class:`.Alias`.

        Eager JOIN generation within the query is disabled.

        :param name: string name to be assigned as the alias;
            this is passed through to :meth:`.FromClause.alias`.
            If ``None``, a name will be deterministically generated
            at compile time.

        :param with_labels: if True, :meth:`.with_labels` will be called
         on the :class:`.Query` first to apply table-qualified labels
         to all columns.

        :param reduce_columns: if True, :meth:`.Select.reduce_columns` will
         be called on the resulting :func:`.select` construct,
         to remove same-named columns where one also refers to the other
         via foreign key or WHERE clause equivalence.

         .. versionchanged:: 0.8 the ``with_labels`` and ``reduce_columns``
            keyword arguments were added.

        """
        q = self.enable_eagerloads(False)
        if with_labels:
            q = q.with_labels()
        q = q.statement
        if reduce_columns:
            q = q.reduce_columns()
        return q.alias(name=name)

    def cte(self, name=None, recursive=False):
        """Return the full SELECT statement represented by this
        :class:`.Query` represented as a common table expression (CTE).

        .. versionadded:: 0.7.6

        Parameters and usage are the same as those of the
        :meth:`.SelectBase.cte` method; see that method for
        further details.

        Here is the `Postgresql WITH
        RECURSIVE example
        <http://www.postgresql.org/docs/8.4/static/queries-with.html>`_.
        Note that, in this example, the ``included_parts`` cte and the
        ``incl_alias`` alias of it are Core selectables, which
        means the columns are accessed via the ``.c.`` attribute.  The
        ``parts_alias`` object is an :func:`.orm.aliased` instance of the
        ``Part`` entity, so column-mapped attributes are available
        directly::

            from sqlalchemy.orm import aliased

            class Part(Base):
                __tablename__ = 'part'
                part = Column(String, primary_key=True)
                sub_part = Column(String, primary_key=True)
                quantity = Column(Integer)

            included_parts = session.query(
                            Part.sub_part,
                            Part.part,
                            Part.quantity).\\
                                filter(Part.part=="our part").\\
                                cte(name="included_parts", recursive=True)

            incl_alias = aliased(included_parts, name="pr")
            parts_alias = aliased(Part, name="p")
            included_parts = included_parts.union_all(
                session.query(
                    parts_alias.part,
                    parts_alias.sub_part,
                    parts_alias.quantity).\\
                        filter(parts_alias.part==incl_alias.c.sub_part)
                )

            q = session.query(
                    included_parts.c.sub_part,
                    func.sum(included_parts.c.quantity).
                        label('total_quantity')
                ).\\
                group_by(included_parts.c.sub_part)

        See also:

        :meth:`.SelectBase.cte`

        """
        return self.enable_eagerloads(False).\
            statement.cte(name=name, recursive=recursive)

    def label(self, name):
        """Return the full SELECT statement represented by this
        :class:`.Query`, converted
        to a scalar subquery with a label of the given name.

        Analogous to :meth:`sqlalchemy.sql.SelectBaseMixin.label`.

        .. versionadded:: 0.6.5

        """

        return self.enable_eagerloads(False).statement.label(name)

    def as_scalar(self):
        """Return the full SELECT statement represented by this
        :class:`.Query`, converted to a scalar subquery.

        Analogous to :meth:`sqlalchemy.sql.SelectBaseMixin.as_scalar`.

        .. versionadded:: 0.6.5

        """

        return self.enable_eagerloads(False).statement.as_scalar()

    @property
    def selectable(self):
        """Return the :class:`.Select` object emitted by this :class:`.Query`.

        Used for :func:`.inspect` compatibility, this is equivalent to::

            query.enable_eagerloads(False).with_labels().statement

        """
        return self.__clause_element__()

    def __clause_element__(self):
        return self.enable_eagerloads(False).with_labels().statement

    @_generative()
    def enable_eagerloads(self, value):
        """Control whether or not eager joins and subqueries are
        rendered.

        When set to False, the returned Query will not render
        eager joins regardless of :func:`~sqlalchemy.orm.joinedload`,
        :func:`~sqlalchemy.orm.subqueryload` options
        or mapper-level ``lazy='joined'``/``lazy='subquery'``
        configurations.

        This is used primarily when nesting the Query's
        statement into a subquery or other
        selectable.

        """
        self._enable_eagerloads = value

    @_generative()
    def with_labels(self):
        """Apply column labels to the return value of Query.statement.

        Indicates that this Query's `statement` accessor should return
        a SELECT statement that applies labels to all columns in the
        form <tablename>_<columnname>; this is commonly used to
        disambiguate columns from multiple tables which have the same
        name.

        When the `Query` actually issues SQL to load rows, it always
        uses column labeling.

        """
        self._with_labels = True

    @_generative()
    def enable_assertions(self, value):
        """Control whether assertions are generated.

        When set to False, the returned Query will
        not assert its state before certain operations,
        including that LIMIT/OFFSET has not been applied
        when filter() is called, no criterion exists
        when get() is called, and no "from_statement()"
        exists when filter()/order_by()/group_by() etc.
        is called.  This more permissive mode is used by
        custom Query subclasses to specify criterion or
        other modifiers outside of the usual usage patterns.

        Care should be taken to ensure that the usage
        pattern is even possible.  A statement applied
        by from_statement() will override any criterion
        set by filter() or order_by(), for example.

        """
        self._enable_assertions = value

    @property
    def whereclause(self):
        """A readonly attribute which returns the current WHERE criterion for
        this Query.

        This returned value is a SQL expression construct, or ``None`` if no
        criterion has been established.

        """
        return self._criterion

    @_generative()
    def _with_current_path(self, path):
        """indicate that this query applies to objects loaded
        within a certain path.

        Used by deferred loaders (see strategies.py) which transfer
        query options from an originating query to a newly generated
        query intended for the deferred load.

        """
        self._current_path = path

    @_generative(_no_clauseelement_condition)
    def with_polymorphic(self,
                                    cls_or_mappers,
                                    selectable=None,
                                    polymorphic_on=None):
        """Load columns for inheriting classes.

        :meth:`.Query.with_polymorphic` applies transformations
        to the "main" mapped class represented by this :class:`.Query`.
        The "main" mapped class here means the :class:`.Query`
        object's first argument is a full class, i.e.
        ``session.query(SomeClass)``. These transformations allow additional
        tables to be present in the FROM clause so that columns for a
        joined-inheritance subclass are available in the query, both for the
        purposes of load-time efficiency as well as the ability to use
        these columns at query time.

        See the documentation section :ref:`with_polymorphic` for
        details on how this method is used.

        .. versionchanged:: 0.8
            A new and more flexible function
            :func:`.orm.with_polymorphic` supersedes
            :meth:`.Query.with_polymorphic`, as it can apply the equivalent
            functionality to any set of columns or classes in the
            :class:`.Query`, not just the "zero mapper".  See that
            function for a description of arguments.

        """

        if not getattr(self._entities[0], 'primary_entity', False):
            raise sa_exc.InvalidRequestError(
                            "No primary mapper set up for this Query.")
        entity = self._entities[0]._clone()
        self._entities = [entity] + self._entities[1:]
        entity.set_with_polymorphic(self,
                                        cls_or_mappers,
                                        selectable=selectable,
                                        polymorphic_on=polymorphic_on)

    @_generative()
    def yield_per(self, count):
        """Yield only ``count`` rows at a time.

        WARNING: use this method with caution; if the same instance is present
        in more than one batch of rows, end-user changes to attributes will be
        overwritten.

        In particular, it's usually impossible to use this setting with
        eagerly loaded collections (i.e. any lazy='joined' or 'subquery')
        since those collections will be cleared for a new load when
        encountered in a subsequent result batch.   In the case of 'subquery'
        loading, the full result for all rows is fetched which generally
        defeats the purpose of :meth:`~sqlalchemy.orm.query.Query.yield_per`.

        Also note that while :meth:`~sqlalchemy.orm.query.Query.yield_per`
        will set the ``stream_results`` execution option to True, currently
        this is only understood by :mod:`~sqlalchemy.dialects.postgresql.psycopg2` dialect
        which will stream results using server side cursors instead of pre-buffer
        all rows for this query. Other DBAPIs pre-buffer all rows before
        making them available.

        """
        self._yield_per = count
        self._execution_options = self._execution_options.union(
                                        {"stream_results": True})

    def get(self, ident):
        """Return an instance based on the given primary key identifier,
        or ``None`` if not found.

        E.g.::

            my_user = session.query(User).get(5)

            some_object = session.query(VersionedFoo).get((5, 10))

        :meth:`~.Query.get` is special in that it provides direct
        access to the identity map of the owning :class:`.Session`.
        If the given primary key identifier is present
        in the local identity map, the object is returned
        directly from this collection and no SQL is emitted,
        unless the object has been marked fully expired.
        If not present,
        a SELECT is performed in order to locate the object.

        :meth:`~.Query.get` also will perform a check if
        the object is present in the identity map and
        marked as expired - a SELECT
        is emitted to refresh the object as well as to
        ensure that the row is still present.
        If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised.

        :meth:`~.Query.get` is only used to return a single
        mapped instance, not multiple instances or
        individual column constructs, and strictly
        on a single primary key value.  The originating
        :class:`.Query` must be constructed in this way,
        i.e. against a single mapped entity,
        with no additional filtering criterion.  Loading
        options via :meth:`~.Query.options` may be applied
        however, and will be used if the object is not
        yet locally present.

        A lazy-loading, many-to-one attribute configured
        by :func:`.relationship`, using a simple
        foreign-key-to-primary-key criterion, will also use an
        operation equivalent to :meth:`~.Query.get` in order to retrieve
        the target value from the local identity map
        before querying the database.  See :doc:`/orm/loading`
        for further details on relationship loading.

        :param ident: A scalar or tuple value representing
         the primary key.   For a composite primary key,
         the order of identifiers corresponds in most cases
         to that of the mapped :class:`.Table` object's
         primary key columns.  For a :func:`.mapper` that
         was given the ``primary key`` argument during
         construction, the order of identifiers corresponds
         to the elements present in this collection.

        :return: The object instance, or ``None``.

        """

        # convert composite types to individual args
        if hasattr(ident, '__composite_values__'):
            ident = ident.__composite_values__()

        ident = util.to_list(ident)

        mapper = self._only_full_mapper_zero("get")

        if len(ident) != len(mapper.primary_key):
            raise sa_exc.InvalidRequestError(
            "Incorrect number of values in identifier to formulate "
            "primary key for query.get(); primary key columns are %s" %
            ','.join("'%s'" % c for c in mapper.primary_key))

        key = mapper.identity_key_from_primary_key(ident)

        if not self._populate_existing and \
                not mapper.always_refresh and \
                self._lockmode is None:

            instance = loading.get_from_identity(
                self.session, key, attributes.PASSIVE_OFF)
            if instance is not None:
                # reject calls for id in identity map but class
                # mismatch.
                if not issubclass(instance.__class__, mapper.class_):
                    return None
                return instance

        return loading.load_on_ident(self, key)

    @_generative()
    def correlate(self, *args):
        """Return a :class:`.Query` construct which will correlate the given
        FROM clauses to that of an enclosing :class:`.Query` or
        :func:`~.expression.select`.

        The method here accepts mapped classes, :func:`.aliased` constructs,
        and :func:`.mapper` constructs as arguments, which are resolved into
        expression constructs, in addition to appropriate expression
        constructs.

        The correlation arguments are ultimately passed to
        :meth:`.Select.correlate` after coercion to expression constructs.

        The correlation arguments take effect in such cases
        as when :meth:`.Query.from_self` is used, or when
        a subquery as returned by :meth:`.Query.subquery` is
        embedded in another :func:`~.expression.select` construct.

         """

        self._correlate = self._correlate.union(
                                        _interpret_as_from(s)
                                        if s is not None else None
                                        for s in args)

    @_generative()
    def autoflush(self, setting):
        """Return a Query with a specific 'autoflush' setting.

        Note that a Session with autoflush=False will
        not autoflush, even if this flag is set to True at the
        Query level.  Therefore this flag is usually used only
        to disable autoflush for a specific Query.

        """
        self._autoflush = setting

    @_generative()
    def populate_existing(self):
        """Return a :class:`.Query` that will expire and refresh all instances
        as they are loaded, or reused from the current :class:`.Session`.

        :meth:`.populate_existing` does not improve behavior when
        the ORM is used normally - the :class:`.Session` object's usual
        behavior of maintaining a transaction and expiring all attributes
        after rollback or commit handles object state automatically.
        This method is not intended for general use.

        """
        self._populate_existing = True

    @_generative()
    def _with_invoke_all_eagers(self, value):
        """Set the 'invoke all eagers' flag which causes joined- and
        subquery loaders to traverse into already-loaded related objects
        and collections.

        Default is that of :attr:`.Query._invoke_all_eagers`.

        """
        self._invoke_all_eagers = value

    def with_parent(self, instance, property=None):
        """Add filtering criterion that relates the given instance
        to a child object or collection, using its attribute state
        as well as an established :func:`.relationship()`
        configuration.

        The method uses the :func:`.with_parent` function to generate
        the clause, the result of which is passed to :meth:`.Query.filter`.

        Parameters are the same as :func:`.with_parent`, with the exception
        that the given property can be None, in which case a search is
        performed against this :class:`.Query` object's target mapper.

        """

        if property is None:
            from sqlalchemy.orm import properties
            mapper = object_mapper(instance)

            for prop in mapper.iterate_properties:
                if isinstance(prop, properties.PropertyLoader) and \
                    prop.mapper is self._mapper_zero():
                    property = prop
                    break
            else:
                raise sa_exc.InvalidRequestError(
                        "Could not locate a property which relates instances "
                        "of class '%s' to instances of class '%s'" %
                        (
                            self._mapper_zero().class_.__name__,
                            instance.__class__.__name__)
                        )

        return self.filter(with_parent(instance, property))

    @_generative()
    def add_entity(self, entity, alias=None):
        """add a mapped entity to the list of result columns
        to be returned."""

        if alias is not None:
            entity = aliased(entity, alias)

        self._entities = list(self._entities)
        m = _MapperEntity(self, entity)
        self._set_entity_selectables([m])

    @_generative()
    def with_session(self, session):
        """Return a :class:`Query` that will use the given :class:`.Session`.

        """

        self.session = session

    def from_self(self, *entities):
        """return a Query that selects from this Query's
        SELECT statement.

        \*entities - optional list of entities which will replace
        those being selected.

        """
        fromclause = self.with_labels().enable_eagerloads(False).\
                                    _enable_single_crit(False).\
                                    statement.correlate(None)
        q = self._from_selectable(fromclause)
        if entities:
            q._set_entities(entities)
        return q

    @_generative()
    def _enable_single_crit(self, val):
        self._enable_single_crit = val

    @_generative()
    def _from_selectable(self, fromclause):
        for attr in (
                '_statement', '_criterion',
                '_order_by', '_group_by',
                '_limit', '_offset',
                '_joinpath', '_joinpoint',
                '_distinct', '_having',
                '_prefixes',
        ):
            self.__dict__.pop(attr, None)
        self._set_select_from([fromclause], True)

        # this enables clause adaptation for non-ORM
        # expressions.
        self._orm_only_from_obj_alias = False

        old_entities = self._entities
        self._entities = []
        for e in old_entities:
            e.adapt_to_selectable(self, self._from_obj[0])

    def values(self, *columns):
        """Return an iterator yielding result tuples corresponding
        to the given list of columns"""

        if not columns:
            return iter(())
        q = self._clone()
        q._set_entities(columns, entity_wrapper=_ColumnEntity)
        if not q._yield_per:
            q._yield_per = 10
        return iter(q)
    _values = values

    def value(self, column):
        """Return a scalar result corresponding to the given
        column expression."""
        try:
            # Py3K
            #return self.values(column).__next__()[0]
            # Py2K
            return self.values(column).next()[0]
            # end Py2K
        except StopIteration:
            return None

    @_generative()
    def with_entities(self, *entities):
        """Return a new :class:`.Query` replacing the SELECT list with the
        given entities.

        e.g.::

            # Users, filtered on some arbitrary criterion
            # and then ordered by related email address
            q = session.query(User).\\
                        join(User.address).\\
                        filter(User.name.like('%ed%')).\\
                        order_by(Address.email)

            # given *only* User.id==5, Address.email, and 'q', what
            # would the *next* User in the result be ?
            subq = q.with_entities(Address.email).\\
                        order_by(None).\\
                        filter(User.id==5).\\
                        subquery()
            q = q.join((subq, subq.c.email < Address.email)).\\
                        limit(1)

        .. versionadded:: 0.6.5

        """
        self._set_entities(entities)

    @_generative()
    def add_columns(self, *column):
        """Add one or more column expressions to the list
        of result columns to be returned."""

        self._entities = list(self._entities)
        l = len(self._entities)
        for c in column:
            _ColumnEntity(self, c)
        # _ColumnEntity may add many entities if the
        # given arg is a FROM clause
        self._set_entity_selectables(self._entities[l:])

    @util.pending_deprecation("0.7",
                ":meth:`.add_column` is superseded by :meth:`.add_columns`",
                False)
    def add_column(self, column):
        """Add a column expression to the list of result columns to be
        returned.

        Pending deprecation: :meth:`.add_column` will be superseded by
        :meth:`.add_columns`.

        """
        return self.add_columns(column)

    def options(self, *args):
        """Return a new Query object, applying the given list of
        mapper options.

        Most supplied options regard changing how column- and
        relationship-mapped attributes are loaded. See the sections
        :ref:`deferred` and :doc:`/orm/loading` for reference
        documentation.

        """
        return self._options(False, *args)

    def _conditional_options(self, *args):
        return self._options(True, *args)

    @_generative()
    def _options(self, conditional, *args):
        # most MapperOptions write to the '_attributes' dictionary,
        # so copy that as well
        self._attributes = self._attributes.copy()
        opts = tuple(util.flatten_iterator(args))
        self._with_options = self._with_options + opts
        if conditional:
            for opt in opts:
                opt.process_query_conditionally(self)
        else:
            for opt in opts:
                opt.process_query(self)

    def with_transformation(self, fn):
        """Return a new :class:`.Query` object transformed by
        the given function.

        E.g.::

            def filter_something(criterion):
                def transform(q):
                    return q.filter(criterion)
                return transform

            q = q.with_transformation(filter_something(x==5))

        This allows ad-hoc recipes to be created for :class:`.Query`
        objects.  See the example at :ref:`hybrid_transformers`.

        .. versionadded:: 0.7.4

        """
        return fn(self)

    @_generative()
    def with_hint(self, selectable, text, dialect_name='*'):
        """Add an indexing hint for the given entity or selectable to
        this :class:`.Query`.

        Functionality is passed straight through to
        :meth:`~sqlalchemy.sql.expression.Select.with_hint`,
        with the addition that ``selectable`` can be a
        :class:`.Table`, :class:`.Alias`, or ORM entity / mapped class
        /etc.
        """
        selectable = inspect(selectable).selectable

        self._with_hints += ((selectable, text, dialect_name),)

    @_generative()
    def execution_options(self, **kwargs):
        """ Set non-SQL options which take effect during execution.

        The options are the same as those accepted by
        :meth:`.Connection.execution_options`.

        Note that the ``stream_results`` execution option is enabled
        automatically if the :meth:`~sqlalchemy.orm.query.Query.yield_per()`
        method is used.

        """
        self._execution_options = self._execution_options.union(kwargs)

    @_generative()
    def with_lockmode(self, mode):
        """Return a new Query object with the specified locking mode.

        :param mode: a string representing the desired locking mode. A
            corresponding value is passed to the ``for_update`` parameter of
            :meth:`~sqlalchemy.sql.expression.select` when the query is
            executed. Valid values are:

            ``'update'`` - passes ``for_update=True``, which translates to
            ``FOR UPDATE`` (standard SQL, supported by most dialects)

            ``'update_nowait'`` - passes ``for_update='nowait'``, which
            translates to ``FOR UPDATE NOWAIT`` (supported by Oracle,
            PostgreSQL 8.1 upwards)

            ``'read'`` - passes ``for_update='read'``, which translates to
            ``LOCK IN SHARE MODE`` (for MySQL), and ``FOR SHARE`` (for
            PostgreSQL)

            ``'read_nowait'`` - passes ``for_update='read_nowait'``, which
            translates to ``FOR SHARE NOWAIT`` (supported by PostgreSQL).

            .. versionadded:: 0.7.7
                ``FOR SHARE`` and ``FOR SHARE NOWAIT`` (PostgreSQL).
        """

        self._lockmode = mode

    @_generative()
    def params(self, *args, **kwargs):
        """add values for bind parameters which may have been
        specified in filter().

        parameters may be specified using \**kwargs, or optionally a single
        dictionary as the first positional argument. The reason for both is
        that \**kwargs is convenient, however some parameter dictionaries
        contain unicode keys in which case \**kwargs cannot be used.

        """
        if len(args) == 1:
            kwargs.update(args[0])
        elif len(args) > 0:
            raise sa_exc.ArgumentError(
                            "params() takes zero or one positional argument, "
                            "which is a dictionary.")
        self._params = self._params.copy()
        self._params.update(kwargs)

    @_generative(_no_statement_condition, _no_limit_offset)
    def filter(self, *criterion):
        """apply the given filtering criterion to a copy
        of this :class:`.Query`, using SQL expressions.

        e.g.::

            session.query(MyClass).filter(MyClass.name == 'some name')

        Multiple criteria are joined together by AND::

            session.query(MyClass).\\
                filter(MyClass.name == 'some name', MyClass.id > 5)

        The criterion is any SQL expression object applicable to the
        WHERE clause of a select.   String expressions are coerced
        into SQL expression constructs via the :func:`.text` construct.

        .. versionchanged:: 0.7.5
            Multiple criteria joined by AND.

        See also:

        :meth:`.Query.filter_by` - filter on keyword expressions.

        """
        for criterion in list(criterion):
            criterion = expression._literal_as_text(criterion)

            criterion = self._adapt_clause(criterion, True, True)

            if self._criterion is not None:
                self._criterion = self._criterion & criterion
            else:
                self._criterion = criterion

    def filter_by(self, **kwargs):
        """apply the given filtering criterion to a copy
        of this :class:`.Query`, using keyword expressions.

        e.g.::

            session.query(MyClass).filter_by(name = 'some name')

        Multiple criteria are joined together by AND::

            session.query(MyClass).\\
                filter_by(name = 'some name', id = 5)

        The keyword expressions are extracted from the primary
        entity of the query, or the last entity that was the
        target of a call to :meth:`.Query.join`.

        See also:

        :meth:`.Query.filter` - filter on SQL expressions.

        """

        clauses = [_entity_descriptor(self._joinpoint_zero(), key) == value
            for key, value in kwargs.iteritems()]
        return self.filter(sql.and_(*clauses))

    @_generative(_no_statement_condition, _no_limit_offset)
    def order_by(self, *criterion):
        """apply one or more ORDER BY criterion to the query and return
        the newly resulting ``Query``

        All existing ORDER BY settings can be suppressed by
        passing ``None`` - this will suppress any ORDER BY configured
        on mappers as well.

        Alternatively, an existing ORDER BY setting on the Query
        object can be entirely cancelled by passing ``False``
        as the value - use this before calling methods where
        an ORDER BY is invalid.

        """

        if len(criterion) == 1:
            if criterion[0] is False:
                if '_order_by' in self.__dict__:
                    del self._order_by
                return
            if criterion[0] is None:
                self._order_by = None
                return

        criterion = self._adapt_col_list(criterion)

        if self._order_by is False or self._order_by is None:
            self._order_by = criterion
        else:
            self._order_by = self._order_by + criterion

    @_generative(_no_statement_condition, _no_limit_offset)
    def group_by(self, *criterion):
        """apply one or more GROUP BY criterion to the query and return
        the newly resulting :class:`.Query`"""

        criterion = list(chain(*[_orm_columns(c) for c in criterion]))
        criterion = self._adapt_col_list(criterion)

        if self._group_by is False:
            self._group_by = criterion
        else:
            self._group_by = self._group_by + criterion

    @_generative(_no_statement_condition, _no_limit_offset)
    def having(self, criterion):
        """apply a HAVING criterion to the query and return the
        newly resulting :class:`.Query`.

        :meth:`having` is used in conjunction with :meth:`group_by`.

        HAVING criterion makes it possible to use filters on aggregate
        functions like COUNT, SUM, AVG, MAX, and MIN, eg.::

            q = session.query(User.id).\\
                        join(User.addresses).\\
                        group_by(User.id).\\
                        having(func.count(Address.id) > 2)

        """

        if isinstance(criterion, basestring):
            criterion = sql.text(criterion)

        if criterion is not None and \
                not isinstance(criterion, sql.ClauseElement):
            raise sa_exc.ArgumentError(
                    "having() argument must be of type "
                    "sqlalchemy.sql.ClauseElement or string")

        criterion = self._adapt_clause(criterion, True, True)

        if self._having is not None:
            self._having = self._having & criterion
        else:
            self._having = criterion

    def union(self, *q):
        """Produce a UNION of this Query against one or more queries.

        e.g.::

            q1 = sess.query(SomeClass).filter(SomeClass.foo=='bar')
            q2 = sess.query(SomeClass).filter(SomeClass.bar=='foo')

            q3 = q1.union(q2)

        The method accepts multiple Query objects so as to control
        the level of nesting.  A series of ``union()`` calls such as::

            x.union(y).union(z).all()

        will nest on each ``union()``, and produces::

            SELECT * FROM (SELECT * FROM (SELECT * FROM X UNION
                            SELECT * FROM y) UNION SELECT * FROM Z)

        Whereas::

            x.union(y, z).all()

        produces::

            SELECT * FROM (SELECT * FROM X UNION SELECT * FROM y UNION
                            SELECT * FROM Z)

        Note that many database backends do not allow ORDER BY to
        be rendered on a query called within UNION, EXCEPT, etc.
        To disable all ORDER BY clauses including those configured
        on mappers, issue ``query.order_by(None)`` - the resulting
        :class:`.Query` object will not render ORDER BY within
        its SELECT statement.

        """

        return self._from_selectable(
                    expression.union(*([self] + list(q))))

    def union_all(self, *q):
        """Produce a UNION ALL of this Query against one or more queries.

        Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See
        that method for usage examples.

        """
        return self._from_selectable(
                    expression.union_all(*([self] + list(q)))
                )

    def intersect(self, *q):
        """Produce an INTERSECT of this Query against one or more queries.

        Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See
        that method for usage examples.

        """
        return self._from_selectable(
                    expression.intersect(*([self] + list(q)))
                )

    def intersect_all(self, *q):
        """Produce an INTERSECT ALL of this Query against one or more queries.

        Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See
        that method for usage examples.

        """
        return self._from_selectable(
                    expression.intersect_all(*([self] + list(q)))
                )

    def except_(self, *q):
        """Produce an EXCEPT of this Query against one or more queries.

        Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See
        that method for usage examples.

        """
        return self._from_selectable(
                    expression.except_(*([self] + list(q)))
                )

    def except_all(self, *q):
        """Produce an EXCEPT ALL of this Query against one or more queries.

        Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See
        that method for usage examples.

        """
        return self._from_selectable(
                    expression.except_all(*([self] + list(q)))
                )

    def join(self, *props, **kwargs):
        """Create a SQL JOIN against this :class:`.Query` object's criterion
        and apply generatively, returning the newly resulting :class:`.Query`.

        **Simple Relationship Joins**

        Consider a mapping between two classes ``User`` and ``Address``,
        with a relationship ``User.addresses`` representing a collection
        of ``Address`` objects associated with each ``User``.   The most common
        usage of :meth:`~.Query.join` is to create a JOIN along this
        relationship, using the ``User.addresses`` attribute as an indicator
        for how this should occur::

            q = session.query(User).join(User.addresses)

        Where above, the call to :meth:`~.Query.join` along ``User.addresses``
        will result in SQL equivalent to::

            SELECT user.* FROM user JOIN address ON user.id = address.user_id

        In the above example we refer to ``User.addresses`` as passed to
        :meth:`~.Query.join` as the *on clause*, that is, it indicates
        how the "ON" portion of the JOIN should be constructed.  For a
        single-entity query such as the one above (i.e. we start by selecting
        only from ``User`` and nothing else), the relationship can also be
        specified by its string name::

            q = session.query(User).join("addresses")

        :meth:`~.Query.join` can also accommodate multiple
        "on clause" arguments to produce a chain of joins, such as below
        where a join across four related entities is constructed::

            q = session.query(User).join("orders", "items", "keywords")

        The above would be shorthand for three separate calls to
        :meth:`~.Query.join`, each using an explicit attribute to indicate
        the source entity::

            q = session.query(User).\\
                    join(User.orders).\\
                    join(Order.items).\\
                    join(Item.keywords)

        **Joins to a Target Entity or Selectable**

        A second form of :meth:`~.Query.join` allows any mapped entity
        or core selectable construct as a target.   In this usage,
        :meth:`~.Query.join` will attempt
        to create a JOIN along the natural foreign key relationship between
        two entities::

            q = session.query(User).join(Address)

        The above calling form of :meth:`.join` will raise an error if
        either there are no foreign keys between the two entities, or if
        there are multiple foreign key linkages between them.   In the
        above calling form, :meth:`~.Query.join` is called upon to
        create the "on clause" automatically for us.  The target can
        be any mapped entity or selectable, such as a :class:`.Table`::

            q = session.query(User).join(addresses_table)

        **Joins to a Target with an ON Clause**

        The third calling form allows both the target entity as well
        as the ON clause to be passed explicitly.   Suppose for
        example we wanted to join to ``Address`` twice, using
        an alias the second time.  We use :func:`~sqlalchemy.orm.aliased`
        to create a distinct alias of ``Address``, and join
        to it using the ``target, onclause`` form, so that the
        alias can be specified explicitly as the target along with
        the relationship to instruct how the ON clause should proceed::

            a_alias = aliased(Address)

            q = session.query(User).\\
                    join(User.addresses).\\
                    join(a_alias, User.addresses).\\
                    filter(Address.email_address=='ed@foo.com').\\
                    filter(a_alias.email_address=='ed@bar.com')

        Where above, the generated SQL would be similar to::

            SELECT user.* FROM user
                JOIN address ON user.id = address.user_id
                JOIN address AS address_1 ON user.id=address_1.user_id
                WHERE address.email_address = :email_address_1
                AND address_1.email_address = :email_address_2

        The two-argument calling form of :meth:`~.Query.join`
        also allows us to construct arbitrary joins with SQL-oriented
        "on clause" expressions, not relying upon configured relationships
        at all.  Any SQL expression can be passed as the ON clause
        when using the two-argument form, which should refer to the target
        entity in some way as well as an applicable source entity::

            q = session.query(User).join(Address, User.id==Address.user_id)

        .. versionchanged:: 0.7
            In SQLAlchemy 0.6 and earlier, the two argument form of
            :meth:`~.Query.join` requires the usage of a tuple:
            ``query(User).join((Address, User.id==Address.user_id))``\ .
            This calling form is accepted in 0.7 and further, though
            is not necessary unless multiple join conditions are passed to
            a single :meth:`~.Query.join` call, which itself is also not
            generally necessary as it is now equivalent to multiple
            calls (this wasn't always the case).

        **Advanced Join Targeting and Adaption**

        There is a lot of flexibility in what the "target" can be when using
        :meth:`~.Query.join`.   As noted previously, it also accepts
        :class:`.Table` constructs and other selectables such as
        :func:`.alias` and :func:`.select` constructs, with either the one
        or two-argument forms::

            addresses_q = select([Address.user_id]).\\
                        where(Address.email_address.endswith("@bar.com")).\\
                        alias()

            q = session.query(User).\\
                        join(addresses_q, addresses_q.c.user_id==User.id)

        :meth:`~.Query.join` also features the ability to *adapt* a
        :meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target
        selectable. Below we construct a JOIN from ``User`` to a subquery
        against ``Address``, allowing the relationship denoted by
        ``User.addresses`` to *adapt* itself to the altered target::

            address_subq = session.query(Address).\\
                            filter(Address.email_address == 'ed@foo.com').\\
                            subquery()

            q = session.query(User).join(address_subq, User.addresses)

        Producing SQL similar to::

            SELECT user.* FROM user
                JOIN (
                    SELECT address.id AS id,
                            address.user_id AS user_id,
                            address.email_address AS email_address
                    FROM address
                    WHERE address.email_address = :email_address_1
                ) AS anon_1 ON user.id = anon_1.user_id

        The above form allows one to fall back onto an explicit ON
        clause at any time::

            q = session.query(User).\\
                    join(address_subq, User.id==address_subq.c.user_id)

        **Controlling what to Join From**

        While :meth:`~.Query.join` exclusively deals with the "right"
        side of the JOIN, we can also control the "left" side, in those
        cases where it's needed, using :meth:`~.Query.select_from`.
        Below we construct a query against ``Address`` but can still
        make usage of ``User.addresses`` as our ON clause by instructing
        the :class:`.Query` to select first from the ``User``
        entity::

            q = session.query(Address).select_from(User).\\
                            join(User.addresses).\\
                            filter(User.name == 'ed')

        Which will produce SQL similar to::

            SELECT address.* FROM user
                JOIN address ON user.id=address.user_id
                WHERE user.name = :name_1

        **Constructing Aliases Anonymously**

        :meth:`~.Query.join` can construct anonymous aliases
        using the ``aliased=True`` flag.  This feature is useful
        when a query is being joined algorithmically, such as
        when querying self-referentially to an arbitrary depth::

            q = session.query(Node).\\
                    join("children", "children", aliased=True)

        When ``aliased=True`` is used, the actual "alias" construct
        is not explicitly available.  To work with it, methods such as
        :meth:`.Query.filter` will adapt the incoming entity to
        the last join point::

            q = session.query(Node).\\
                    join("children", "children", aliased=True).\\
                    filter(Node.name == 'grandchild 1')

        When using automatic aliasing, the ``from_joinpoint=True``
        argument can allow a multi-node join to be broken into
        multiple calls to :meth:`~.Query.join`, so that
        each path along the way can be further filtered::

            q = session.query(Node).\\
                    join("children", aliased=True).\\
                    filter(Node.name='child 1').\\
                    join("children", aliased=True, from_joinpoint=True).\\
                    filter(Node.name == 'grandchild 1')

        The filtering aliases above can then be reset back to the
        original ``Node`` entity using :meth:`~.Query.reset_joinpoint`::

            q = session.query(Node).\\
                    join("children", "children", aliased=True).\\
                    filter(Node.name == 'grandchild 1').\\
                    reset_joinpoint().\\
                    filter(Node.name == 'parent 1)

        For an example of ``aliased=True``, see the distribution
        example :ref:`examples_xmlpersistence` which illustrates
        an XPath-like query system using algorithmic joins.

        :param *props: A collection of one or more join conditions,
         each consisting of a relationship-bound attribute or string
         relationship name representing an "on clause", or a single
         target entity, or a tuple in the form of ``(target, onclause)``.
         A special two-argument calling form of the form ``target, onclause``
         is also accepted.
        :param aliased=False: If True, indicate that the JOIN target should be
         anonymously aliased.  Subsequent calls to :class:`~.Query.filter`
         and similar will adapt the incoming criterion to the target
         alias, until :meth:`~.Query.reset_joinpoint` is called.
        :param from_joinpoint=False: When using ``aliased=True``, a setting
         of True here will cause the join to be from the most recent
         joined target, rather than starting back from the original
         FROM clauses of the query.

        See also:

            :ref:`ormtutorial_joins` in the ORM tutorial.

            :ref:`inheritance_toplevel` for details on how :meth:`~.Query.join`
            is used for inheritance relationships.

            :func:`.orm.join` - a standalone ORM-level join function,
            used internally by :meth:`.Query.join`, which in previous
            SQLAlchemy versions was the primary ORM-level joining interface.

        """
        aliased, from_joinpoint = kwargs.pop('aliased', False),\
                                    kwargs.pop('from_joinpoint', False)
        if kwargs:
            raise TypeError("unknown arguments: %s" %
                                ','.join(kwargs.iterkeys()))
        return self._join(props,
                            outerjoin=False, create_aliases=aliased,
                            from_joinpoint=from_joinpoint)

    def outerjoin(self, *props, **kwargs):
        """Create a left outer join against this ``Query`` object's criterion
        and apply generatively, returning the newly resulting ``Query``.

        Usage is the same as the ``join()`` method.

        """
        aliased, from_joinpoint = kwargs.pop('aliased', False), \
                                kwargs.pop('from_joinpoint', False)
        if kwargs:
            raise TypeError("unknown arguments: %s" %
                    ','.join(kwargs.iterkeys()))
        return self._join(props,
                            outerjoin=True, create_aliases=aliased,
                            from_joinpoint=from_joinpoint)

    def _update_joinpoint(self, jp):
        self._joinpoint = jp
        # copy backwards to the root of the _joinpath
        # dict, so that no existing dict in the path is mutated
        while 'prev' in jp:
            f, prev = jp['prev']
            prev = prev.copy()
            prev[f] = jp
            jp['prev'] = (f, prev)
            jp = prev
        self._joinpath = jp

    @_generative(_no_statement_condition, _no_limit_offset)
    def _join(self, keys, outerjoin, create_aliases, from_joinpoint):
        """consumes arguments from join() or outerjoin(), places them into a
        consistent format with which to form the actual JOIN constructs.

        """

        if not from_joinpoint:
            self._reset_joinpoint()

        if len(keys) == 2 and \
            isinstance(keys[0], (expression.FromClause,
                                    type, AliasedClass)) and \
            isinstance(keys[1], (basestring, expression.ClauseElement,
                                        interfaces.PropComparator)):
            # detect 2-arg form of join and
            # convert to a tuple.
            keys = (keys,)

        for arg1 in util.to_list(keys):
            if isinstance(arg1, tuple):
                # "tuple" form of join, multiple
                # tuples are accepted as well.   The simpler
                # "2-arg" form is preferred.  May deprecate
                # the "tuple" usage.
                arg1, arg2 = arg1
            else:
                arg2 = None

            # determine onclause/right_entity.  there
            # is a little bit of legacy behavior still at work here
            # which means they might be in either order.  may possibly
            # lock this down to (right_entity, onclause) in 0.6.
            if isinstance(arg1, (interfaces.PropComparator, basestring)):
                right_entity, onclause = arg2, arg1
            else:
                right_entity, onclause = arg1, arg2

            left_entity = prop = None

            if isinstance(onclause, basestring):
                left_entity = self._joinpoint_zero()

                descriptor = _entity_descriptor(left_entity, onclause)
                onclause = descriptor

            # check for q.join(Class.propname, from_joinpoint=True)
            # and Class is that of the current joinpoint
            elif from_joinpoint and \
                        isinstance(onclause, interfaces.PropComparator):
                left_entity = onclause._parententity

                info = inspect(self._joinpoint_zero())
                left_mapper, left_selectable, left_is_aliased = \
                    getattr(info, 'mapper', None), \
                    info.selectable, \
                    getattr(info, 'is_aliased_class', None)

                if left_mapper is left_entity:
                    left_entity = self._joinpoint_zero()
                    descriptor = _entity_descriptor(left_entity,
                                                            onclause.key)
                    onclause = descriptor

            if isinstance(onclause, interfaces.PropComparator):
                if right_entity is None:
                    right_entity = onclause.property.mapper
                    of_type = getattr(onclause, '_of_type', None)
                    if of_type:
                        right_entity = of_type
                    else:
                        right_entity = onclause.property.mapper

                left_entity = onclause._parententity

                prop = onclause.property
                if not isinstance(onclause,  attributes.QueryableAttribute):
                    onclause = prop

                if not create_aliases:
                    # check for this path already present.
                    # don't render in that case.
                    edge = (left_entity, right_entity, prop.key)
                    if edge in self._joinpoint:
                        # The child's prev reference might be stale --
                        # it could point to a parent older than the
                        # current joinpoint.  If this is the case,
                        # then we need to update it and then fix the
                        # tree's spine with _update_joinpoint.  Copy
                        # and then mutate the child, which might be
                        # shared by a different query object.
                        jp = self._joinpoint[edge].copy()
                        jp['prev'] = (edge, self._joinpoint)
                        self._update_joinpoint(jp)
                        continue

            elif onclause is not None and right_entity is None:
                # TODO: no coverage here
                raise NotImplementedError("query.join(a==b) not supported.")

            self._join_left_to_right(
                                left_entity,
                                right_entity, onclause,
                                outerjoin, create_aliases, prop)

    def _join_left_to_right(self, left, right,
                            onclause, outerjoin, create_aliases, prop):
        """append a JOIN to the query's from clause."""

        self._polymorphic_adapters = self._polymorphic_adapters.copy()

        if left is None:
            if self._from_obj:
                left = self._from_obj[0]
            elif self._entities:
                left = self._entities[0].entity_zero_or_selectable

        if left is right and \
                not create_aliases:
            raise sa_exc.InvalidRequestError(
                        "Can't construct a join from %s to %s, they "
                        "are the same entity" %
                        (left, right))

        right, onclause = self._prepare_right_side(
                                            right, onclause,
                                            create_aliases,
                                            prop)

        # if joining on a MapperProperty path,
        # track the path to prevent redundant joins
        if not create_aliases and prop:
            self._update_joinpoint({
                '_joinpoint_entity': right,
                'prev': ((left, right, prop.key), self._joinpoint)
            })
        else:
            self._joinpoint = {'_joinpoint_entity': right}

        self._join_to_left(left, right, onclause, outerjoin)

    def _prepare_right_side(self, right, onclause, create_aliases, prop):
        info = inspect(right)

        right_mapper, right_selectable, right_is_aliased = \
            getattr(info, 'mapper', None), \
            info.selectable, \
            getattr(info, 'is_aliased_class', False)

        if right_mapper:
            self._join_entities += (info, )

        if right_mapper and prop and \
                not right_mapper.common_parent(prop.mapper):
            raise sa_exc.InvalidRequestError(
                    "Join target %s does not correspond to "
                    "the right side of join condition %s" % (right, onclause)
            )

        if not right_mapper and prop:
            right_mapper = prop.mapper

        need_adapter = False

        if right_mapper and right is right_selectable:
            if not right_selectable.is_derived_from(
                                    right_mapper.mapped_table):
                raise sa_exc.InvalidRequestError(
                    "Selectable '%s' is not derived from '%s'" %
                    (right_selectable.description,
                    right_mapper.mapped_table.description))

            if not isinstance(right_selectable, expression.Alias):
                right_selectable = right_selectable.alias()

            right = aliased(right_mapper, right_selectable)
            need_adapter = True

        aliased_entity = right_mapper and \
                            not right_is_aliased and \
                            (
                                right_mapper.with_polymorphic or
                                isinstance(
                                    right_mapper.mapped_table,
                                    expression.Join)
                            )

        if not need_adapter and (create_aliases or aliased_entity):
            right = aliased(right)
            need_adapter = True

        # if an alias() of the right side was generated here,
        # apply an adapter to all subsequent filter() calls
        # until reset_joinpoint() is called.
        if need_adapter:
            self._filter_aliases = ORMAdapter(right,
                        equivalents=right_mapper and
                                    right_mapper._equivalent_columns or {},
                        chain_to=self._filter_aliases)

        # if the onclause is a ClauseElement, adapt it with any
        # adapters that are in place right now
        if isinstance(onclause, expression.ClauseElement):
            onclause = self._adapt_clause(onclause, True, True)

        # if an alias() on the right side was generated,
        # which is intended to wrap a the right side in a subquery,
        # ensure that columns retrieved from this target in the result
        # set are also adapted.
        if aliased_entity and not create_aliases:
            self._mapper_loads_polymorphically_with(
                        right_mapper,
                        ORMAdapter(
                            right,
                            equivalents=right_mapper._equivalent_columns
                        )
                    )

        return right, onclause

    def _join_to_left(self, left, right, onclause, outerjoin):
        info = inspect(left)
        left_mapper = getattr(info, 'mapper', None)
        left_selectable = info.selectable

        if self._from_obj:
            replace_clause_index, clause = sql_util.find_join_source(
                                                    self._from_obj,
                                                    left_selectable)
            if clause is not None:
                try:
                    clause = orm_join(clause,
                                    right,
                                    onclause, isouter=outerjoin)
                except sa_exc.ArgumentError, ae:
                    raise sa_exc.InvalidRequestError(
                            "Could not find a FROM clause to join from.  "
                            "Tried joining to %s, but got: %s" % (right, ae))

                self._from_obj = \
                        self._from_obj[:replace_clause_index] + \
                        (clause, ) + \
                        self._from_obj[replace_clause_index + 1:]
                return

        if left_mapper:
            for ent in self._entities:
                if ent.corresponds_to(left):
                    clause = ent.selectable
                    break
            else:
                clause = left
        else:
            clause = left_selectable

        assert clause is not None

        try:
            clause = orm_join(clause, right, onclause, isouter=outerjoin)
        except sa_exc.ArgumentError, ae:
            raise sa_exc.InvalidRequestError(
                    "Could not find a FROM clause to join from.  "
                    "Tried joining to %s, but got: %s" % (right, ae))
        self._from_obj = self._from_obj + (clause,)

    def _reset_joinpoint(self):
        self._joinpoint = self._joinpath
        self._filter_aliases = None

    @_generative(_no_statement_condition)
    def reset_joinpoint(self):
        """Return a new :class:`.Query`, where the "join point" has
        been reset back to the base FROM entities of the query.

        This method is usually used in conjunction with the
        ``aliased=True`` feature of the :meth:`~.Query.join`
        method.  See the example in :meth:`~.Query.join` for how
        this is used.

        """
        self._reset_joinpoint()


    @_generative(_no_clauseelement_condition)
    def select_from(self, *from_obj):
        """Set the FROM clause of this :class:`.Query` explicitly.

        :meth:`.Query.select_from` is often used in conjunction with
        :meth:`.Query.join` in order to control which entity is selected
        from on the "left" side of the join.

        The entity or selectable object here effectively replaces the
        "left edge" of any calls to :meth:`~.Query.join`, when no
        joinpoint is otherwise established - usually, the default "join
        point" is the leftmost entity in the :class:`~.Query` object's
        list of entities to be selected.

        A typical example::

            q = session.query(Address).select_from(User).\\
                join(User.addresses).\\
                filter(User.name == 'ed')

        Which produces SQL equivalent to::

            SELECT address.* FROM user
            JOIN address ON user.id=address.user_id
            WHERE user.name = :name_1

        :param \*from_obj: collection of one or more entities to apply
         to the FROM clause.  Entities can be mapped classes,
         :class:`.AliasedClass` objects, :class:`.Mapper` objects
         as well as core :class:`.FromClause` elements like subqueries.

        .. note::

            :meth:`.Query.select_from` features a deprecated behavior
            whereby when passed a :class:`.FromClause` element,
            such as a select construct, it will apply that select
            construct to *replace* the FROM clause that an existing
            entity is joined from.  This behavior is being removed
            in SQLAlchemy 0.9, to be replaced with the
            :meth:`.Query.select_entity_from` method.  Applications
            which rely on this behavior to re-base query entities to
            an arbitrary selectable should transition to this
            method before upgrading to 0.9.

        .. seealso::

            :meth:`~.Query.join`

            :meth:`.Query.select_entity_from`

        """

        self._set_select_from(from_obj, False)

    @_generative(_no_clauseelement_condition)
    def select_entity_from(self, from_obj):
        """Set the FROM clause of this :class:`.Query` to a
        core selectable, applying it as a replacement FROM clause
        for corresponding mapped entities.

        This method is currently equivalent to the
        :meth:`.Query.select_from` method, but in 0.9 these two
        methods will diverge in functionality.

        In addition to changing the FROM list, the method will
        also apply the given selectable
        to replace the FROM which the selected entities would normally
        select from.

        The given ``from_obj`` must be an instance of a :class:`.FromClause`,
        e.g. a :func:`.select` or :class:`.Alias` construct.

        An example would be a :class:`.Query` that selects ``User`` entities,
        but uses :meth:`.Query.select_entity_from` to have the entities
        selected from a :func:`.select` construct instead of the
        base ``user`` table::

            select_stmt = select([User]).where(User.id == 7)

            q = session.query(User).\\
                    select_entity_from(select_stmt).\\
                    filter(User.name == 'ed')

        The query generated will select ``User`` entities directly
        from the given :func:`.select` construct, and will be::

            SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name
            FROM (SELECT "user".id AS id, "user".name AS name
            FROM "user"
            WHERE "user".id = :id_1) AS anon_1
            WHERE anon_1.name = :name_1

        Notice above that even the WHERE criterion was "adapted" such that
        the ``anon_1`` subquery effectively replaces all references to the
        ``user`` table, except for the one that it refers to internally.

        Compare this to :meth:`.Query.select_from`, which as of
        version 0.9, does not affect existing entities.  The
        statement below::

            q = session.query(User).\\
                    select_from(select_stmt).\\
                    filter(User.name == 'ed')

        Produces SQL where both the ``user`` table as well as the
        ``select_stmt`` construct are present as separate elements
        in the FROM clause.  No "adaptation" of the ``user`` table
        is applied::

            SELECT "user".id AS user_id, "user".name AS user_name
            FROM "user", (SELECT "user".id AS id, "user".name AS name
            FROM "user"
            WHERE "user".id = :id_1) AS anon_1
            WHERE "user".name = :name_1

        :meth:`.Query.select_entity_from` maintains an older
        behavior of :meth:`.Query.select_from`.  In modern usage,
        similar results can also be achieved using :func:`.aliased`::

            select_stmt = select([User]).where(User.id == 7)
            user_from_select = aliased(User, select_stmt.alias())

            q = session.query(user_from_select)

        :param from_obj: a :class:`.FromClause` object that will replace
         the FROM clause of this :class:`.Query`.

        .. seealso::

            :meth:`.Query.select_from`

        .. versionadded:: 0.8.2
            :meth:`.Query.select_entity_from` was added to specify
            the specific behavior of entity replacement, however
            the :meth:`.Query.select_from` maintains this behavior
            as well until 0.9.

        """

        self._set_select_from([from_obj], True)

    def __getitem__(self, item):
        if isinstance(item, slice):
            start, stop, step = util.decode_slice(item)

            if isinstance(stop, int) and \
                isinstance(start, int) and \
                stop - start <= 0:
                return []

            # perhaps we should execute a count() here so that we
            # can still use LIMIT/OFFSET ?
            elif (isinstance(start, int) and start < 0) \
                or (isinstance(stop, int) and stop < 0):
                return list(self)[item]

            res = self.slice(start, stop)
            if step is not None:
                return list(res)[None:None:item.step]
            else:
                return list(res)
        else:
            if item == -1:
                return list(self)[-1]
            else:
                return list(self[item:item + 1])[0]

    @_generative(_no_statement_condition)
    def slice(self, start, stop):
        """apply LIMIT/OFFSET to the ``Query`` based on a "
        "range and return the newly resulting ``Query``."""

        if start is not None and stop is not None:
            self._offset = (self._offset or 0) + start
            self._limit = stop - start
        elif start is None and stop is not None:
            self._limit = stop
        elif start is not None and stop is None:
            self._offset = (self._offset or 0) + start

        if self._offset == 0:
            self._offset = None

    @_generative(_no_statement_condition)
    def limit(self, limit):
        """Apply a ``LIMIT`` to the query and return the newly resulting

        ``Query``.

        """
        self._limit = limit

    @_generative(_no_statement_condition)
    def offset(self, offset):
        """Apply an ``OFFSET`` to the query and return the newly resulting
        ``Query``.

        """
        self._offset = offset

    @_generative(_no_statement_condition)
    def distinct(self, *criterion):
        """Apply a ``DISTINCT`` to the query and return the newly resulting
        ``Query``.

        :param \*expr: optional column expressions.  When present,
         the Postgresql dialect will render a ``DISTINCT ON (<expressions>>)``
         construct.

        """
        if not criterion:
            self._distinct = True
        else:
            criterion = self._adapt_col_list(criterion)
            if isinstance(self._distinct, list):
                self._distinct += criterion
            else:
                self._distinct = criterion

    @_generative()
    def prefix_with(self, *prefixes):
        """Apply the prefixes to the query and return the newly resulting
        ``Query``.

        :param \*prefixes: optional prefixes, typically strings,
        not using any commas.   In particular is useful for MySQL keywords.

        e.g.::

            query = sess.query(User.name).\\
                prefix_with('HIGH_PRIORITY').\\
                prefix_with('SQL_SMALL_RESULT', 'ALL')

        Would render::

            SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL users.name AS users_name
            FROM users

        .. versionadded:: 0.7.7

        """
        if self._prefixes:
            self._prefixes += prefixes
        else:
            self._prefixes = prefixes

    def all(self):
        """Return the results represented by this ``Query`` as a list.

        This results in an execution of the underlying query.

        """
        return list(self)

    @_generative(_no_clauseelement_condition)
    def from_statement(self, statement):
        """Execute the given SELECT statement and return results.

        This method bypasses all internal statement compilation, and the
        statement is executed without modification.

        The statement argument is either a string, a ``select()`` construct,
        or a ``text()`` construct, and should return the set of columns
        appropriate to the entity class represented by this ``Query``.

        """
        if isinstance(statement, basestring):
            statement = sql.text(statement)

        if not isinstance(statement,
                            (expression.TextClause,
                            expression.SelectBase)):
            raise sa_exc.ArgumentError(
                            "from_statement accepts text(), select(), "
                            "and union() objects only.")

        self._statement = statement

    def first(self):
        """Return the first result of this ``Query`` or
        None if the result doesn't contain any row.

        first() applies a limit of one within the generated SQL, so that
        only one primary entity row is generated on the server side
        (note this may consist of multiple result rows if join-loaded
        collections are present).

        Calling ``first()`` results in an execution of the underlying query.

        """
        if self._statement is not None:
            ret = list(self)[0:1]
        else:
            ret = list(self[0:1])
        if len(ret) > 0:
            return ret[0]
        else:
            return None

    def one(self):
        """Return exactly one result or raise an exception.

        Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects
        no rows.  Raises ``sqlalchemy.orm.exc.MultipleResultsFound``
        if multiple object identities are returned, or if multiple
        rows are returned for a query that does not return object
        identities.

        Note that an entity query, that is, one which selects one or
        more mapped classes as opposed to individual column attributes,
        may ultimately represent many rows but only one row of
        unique entity or entities - this is a successful result for one().

        Calling ``one()`` results in an execution of the underlying query.

        .. versionchanged:: 0.6
            ``one()`` fully fetches all results instead of applying
            any kind of limit, so that the "unique"-ing of entities does not
            conceal multiple object identities.

        """
        ret = list(self)

        l = len(ret)
        if l == 1:
            return ret[0]
        elif l == 0:
            raise orm_exc.NoResultFound("No row was found for one()")
        else:
            raise orm_exc.MultipleResultsFound(
                "Multiple rows were found for one()")

    def scalar(self):
        """Return the first element of the first result or None
        if no rows present.  If multiple rows are returned,
        raises MultipleResultsFound.

          >>> session.query(Item).scalar()
          <Item>
          >>> session.query(Item.id).scalar()
          1
          >>> session.query(Item.id).filter(Item.id < 0).scalar()
          None
          >>> session.query(Item.id, Item.name).scalar()
          1
          >>> session.query(func.count(Parent.id)).scalar()
          20

        This results in an execution of the underlying query.

        """
        try:
            ret = self.one()
            if not isinstance(ret, tuple):
                return ret
            return ret[0]
        except orm_exc.NoResultFound:
            return None

    def __iter__(self):
        context = self._compile_context()
        context.statement.use_labels = True
        if self._autoflush and not self._populate_existing:
            self.session._autoflush()
        return self._execute_and_instances(context)

    def _connection_from_session(self, **kw):
        conn = self.session.connection(
                        **kw)
        if self._execution_options:
            conn = conn.execution_options(**self._execution_options)
        return conn

    def _execute_and_instances(self, querycontext):
        conn = self._connection_from_session(
                        mapper=self._mapper_zero_or_none(),
                        clause=querycontext.statement,
                        close_with_result=True)

        result = conn.execute(querycontext.statement, self._params)
        return loading.instances(self, result, querycontext)

    @property
    def column_descriptions(self):
        """Return metadata about the columns which would be
        returned by this :class:`.Query`.

        Format is a list of dictionaries::

            user_alias = aliased(User, name='user2')
            q = sess.query(User, User.id, user_alias)

            # this expression:
            q.column_descriptions

            # would return:
            [
                {
                    'name':'User',
                    'type':User,
                    'aliased':False,
                    'expr':User,
                },
                {
                    'name':'id',
                    'type':Integer(),
                    'aliased':False,
                    'expr':User.id,
                },
                {
                    'name':'user2',
                    'type':User,
                    'aliased':True,
                    'expr':user_alias
                }
            ]

        """
        return [
            {
                'name':ent._label_name,
                'type':ent.type,
                'aliased':getattr(ent, 'is_aliased_class', False),
                'expr':ent.expr
            }
            for ent in self._entities
        ]

    def instances(self, cursor, __context=None):
        """Given a ResultProxy cursor as returned by connection.execute(),
        return an ORM result as an iterator.

        e.g.::

            result = engine.execute("select * from users")
            for u in session.query(User).instances(result):
                print u
        """
        context = __context
        if context is None:
            context = QueryContext(self)

        return loading.instances(self, cursor, context)

    def merge_result(self, iterator, load=True):
        """Merge a result into this :class:`.Query` object's Session.

        Given an iterator returned by a :class:`.Query` of the same structure
        as this one, return an identical iterator of results, with all mapped
        instances merged into the session using :meth:`.Session.merge`. This
        is an optimized method which will merge all mapped instances,
        preserving the structure of the result rows and unmapped columns with
        less method overhead than that of calling :meth:`.Session.merge`
        explicitly for each value.

        The structure of the results is determined based on the column list of
        this :class:`.Query` - if these do not correspond, unchecked errors
        will occur.

        The 'load' argument is the same as that of :meth:`.Session.merge`.

        For an example of how :meth:`~.Query.merge_result` is used, see
        the source code for the example :ref:`examples_caching`, where
        :meth:`~.Query.merge_result` is used to efficiently restore state
        from a cache back into a target :class:`.Session`.

        """

        return loading.merge_result(self, iterator, load)

    @property
    def _select_args(self):
        return {
            'limit': self._limit,
            'offset': self._offset,
            'distinct': self._distinct,
            'prefixes': self._prefixes,
            'group_by': self._group_by or None,
            'having': self._having
        }

    @property
    def _should_nest_selectable(self):
        kwargs = self._select_args
        return (kwargs.get('limit') is not None or
                kwargs.get('offset') is not None or
                kwargs.get('distinct', False))

    def exists(self):
        """A convenience method that turns a query into an EXISTS subquery
        of the form EXISTS (SELECT 1 FROM ... WHERE ...).

        e.g.::

            q = session.query(User).filter(User.name == 'fred')
            session.query(q.exists())

        Producing SQL similar to::

            SELECT EXISTS (
                SELECT 1 FROM users WHERE users.name = :name_1
            ) AS anon_1

        .. versionadded:: 0.8.1

        """
        return sql.exists(self.statement.with_only_columns(['1']))

    def count(self):
        """Return a count of rows this Query would return.

        This generates the SQL for this Query as follows::

            SELECT count(1) AS count_1 FROM (
                SELECT <rest of query follows...>
            ) AS anon_1

        .. versionchanged:: 0.7
            The above scheme is newly refined as of 0.7b3.

        For fine grained control over specific columns
        to count, to skip the usage of a subquery or
        otherwise control of the FROM clause,
        or to use other aggregate functions,
        use :attr:`~sqlalchemy.sql.expression.func`
        expressions in conjunction
        with :meth:`~.Session.query`, i.e.::

            from sqlalchemy import func

            # count User records, without
            # using a subquery.
            session.query(func.count(User.id))

            # return count of user "id" grouped
            # by "name"
            session.query(func.count(User.id)).\\
                    group_by(User.name)

            from sqlalchemy import distinct

            # count distinct "name" values
            session.query(func.count(distinct(User.name)))

        """
        col = sql.func.count(sql.literal_column('*'))
        return self.from_self(col).scalar()

    def delete(self, synchronize_session='evaluate'):
        """Perform a bulk delete query.

        Deletes rows matched by this query from the database.

        :param synchronize_session: chooses the strategy for the removal of
            matched objects from the session. Valid values are:

            ``False`` - don't synchronize the session. This option is the most
            efficient and is reliable once the session is expired, which
            typically occurs after a commit(), or explicitly using
            expire_all(). Before the expiration, objects may still remain in
            the session which were in fact deleted which can lead to confusing
            results if they are accessed via get() or already loaded
            collections.

            ``'fetch'`` - performs a select query before the delete to find
            objects that are matched by the delete query and need to be
            removed from the session. Matched objects are removed from the
            session.

            ``'evaluate'`` - Evaluate the query's criteria in Python straight
            on the objects in the session. If evaluation of the criteria isn't
            implemented, an error is raised.  In that case you probably
            want to use the 'fetch' strategy as a fallback.

            The expression evaluator currently doesn't account for differing
            string collations between the database and Python.

        :return: the count of rows matched as returned by the database's
          "row count" feature.

        This method has several key caveats:

        * The method does **not** offer in-Python cascading of relationships - it
          is assumed that ON DELETE CASCADE/SET NULL/etc. is configured for any foreign key
          references which require it, otherwise the database may emit an
          integrity violation if foreign key references are being enforced.

          After the DELETE, dependent objects in the :class:`.Session` which
          were impacted by an ON DELETE may not contain the current
          state, or may have been deleted. This issue is resolved once the
          :class:`.Session` is expired,
          which normally occurs upon :meth:`.Session.commit` or can be forced
          by using :meth:`.Session.expire_all`.  Accessing an expired object
          whose row has been deleted will invoke a SELECT to locate the
          row; when the row is not found, an :class:`.ObjectDeletedError`
          is raised.

        * The :meth:`.MapperEvents.before_delete` and
          :meth:`.MapperEvents.after_delete`
          events are **not** invoked from this method.  Instead, the
          :meth:`.SessionEvents.after_bulk_delete` method is provided to act
          upon a mass DELETE of entity rows.

        .. seealso::

            :meth:`.Query.update`

            :ref:`inserts_and_updates` - Core SQL tutorial

        """
        #TODO: cascades need handling.

        delete_op = persistence.BulkDelete.factory(
                            self, synchronize_session)
        delete_op.exec_()
        return delete_op.rowcount

    def update(self, values, synchronize_session='evaluate'):
        """Perform a bulk update query.

        Updates rows matched by this query in the database.

        :param values: a dictionary with attributes names as keys and literal
          values or sql expressions as values.

        :param synchronize_session: chooses the strategy to update the
            attributes on objects in the session. Valid values are:

            ``False`` - don't synchronize the session. This option is the most
            efficient and is reliable once the session is expired, which
            typically occurs after a commit(), or explicitly using
            expire_all(). Before the expiration, updated objects may still
            remain in the session with stale values on their attributes, which
            can lead to confusing results.

            ``'fetch'`` - performs a select query before the update to find
            objects that are matched by the update query. The updated
            attributes are expired on matched objects.

            ``'evaluate'`` - Evaluate the Query's criteria in Python straight
            on the objects in the session. If evaluation of the criteria isn't
            implemented, an exception is raised.

            The expression evaluator currently doesn't account for differing
            string collations between the database and Python.

        :return: the count of rows matched as returned by the database's
          "row count" feature.

        This method has several key caveats:

        * The method does **not** offer in-Python cascading of relationships - it
          is assumed that ON UPDATE CASCADE is configured for any foreign key
          references which require it, otherwise the database may emit an
          integrity violation if foreign key references are being enforced.

          After the UPDATE, dependent objects in the :class:`.Session` which
          were impacted by an ON UPDATE CASCADE may not contain the current
          state; this issue is resolved once the :class:`.Session` is expired,
          which normally occurs upon :meth:`.Session.commit` or can be forced
          by using :meth:`.Session.expire_all`.

        * As of 0.8, this method will support multiple table updates, as detailed
          in :ref:`multi_table_updates`, and this behavior does extend to support
          updates of joined-inheritance and other multiple table mappings.  However,
          the **join condition of an inheritance mapper is currently not
          automatically rendered**.
          Care must be taken in any multiple-table update to explicitly include
          the joining condition between those tables, even in mappings where
          this is normally automatic.
          E.g. if a class ``Engineer`` subclasses ``Employee``, an UPDATE of the
          ``Engineer`` local table using criteria against the ``Employee``
          local table might look like::

                session.query(Engineer).\\
                    filter(Engineer.id == Employee.id).\\
                    filter(Employee.name == 'dilbert').\\
                    update({"engineer_type": "programmer"})

        * The :meth:`.MapperEvents.before_update` and
          :meth:`.MapperEvents.after_update`
          events are **not** invoked from this method.  Instead, the
          :meth:`.SessionEvents.after_bulk_update` method is provided to act
          upon a mass UPDATE of entity rows.

        .. seealso::

            :meth:`.Query.delete`

            :ref:`inserts_and_updates` - Core SQL tutorial

        """

        #TODO: value keys need to be mapped to corresponding sql cols and
        # instr.attr.s to string keys
        #TODO: updates of manytoone relationships need to be converted to
        # fk assignments
        #TODO: cascades need handling.

        update_op = persistence.BulkUpdate.factory(
                            self, synchronize_session, values)
        update_op.exec_()
        return update_op.rowcount

    _lockmode_lookup = {
            'read': 'read',
              'read_nowait': 'read_nowait',
              'update': True,
              'update_nowait': 'nowait',
              None: False
    }

    def _compile_context(self, labels=True):
        context = QueryContext(self)

        if context.statement is not None:
            return context

        context.labels = labels

        if self._lockmode:
            try:
                context.for_update = self._lockmode_lookup[self._lockmode]
            except KeyError:
                raise sa_exc.ArgumentError(
                                "Unknown lockmode %r" % self._lockmode)
        for entity in self._entities:
            entity.setup_context(self, context)

        for rec in context.create_eager_joins:
            strategy = rec[0]
            strategy(*rec[1:])

        if context.from_clause:
            # "load from explicit FROMs" mode,
            # i.e. when select_from() or join() is used
            context.froms = list(context.from_clause)
        else:
            # "load from discrete FROMs" mode,
            # i.e. when each _MappedEntity has its own FROM
            context.froms = context.froms

        if self._enable_single_crit:
            self._adjust_for_single_inheritance(context)

        if not context.primary_columns:
            if self._only_load_props:
                raise sa_exc.InvalidRequestError(
                            "No column-based properties specified for "
                            "refresh operation. Use session.expire() "
                            "to reload collections and related items.")
            else:
                raise sa_exc.InvalidRequestError(
                            "Query contains no columns with which to "
                            "SELECT from.")

        if context.multi_row_eager_loaders and self._should_nest_selectable:
            context.statement = self._compound_eager_statement(context)
        else:
            context.statement = self._simple_statement(context)
        return context

    def _compound_eager_statement(self, context):
        # for eager joins present and LIMIT/OFFSET/DISTINCT,
        # wrap the query inside a select,
        # then append eager joins onto that

        if context.order_by:
            order_by_col_expr = list(
                                    chain(*[
                                        sql_util.unwrap_order_by(o)
                                        for o in context.order_by
                                    ])
                                )
        else:
            context.order_by = None
            order_by_col_expr = []

        inner = sql.select(
                    context.primary_columns + order_by_col_expr,
                    context.whereclause,
                    from_obj=context.froms,
                    use_labels=context.labels,
                    # TODO: this order_by is only needed if
                    # LIMIT/OFFSET is present in self._select_args,
                    # else the application on the outside is enough
                    order_by=context.order_by,
                    **self._select_args
                )

        for hint in self._with_hints:
            inner = inner.with_hint(*hint)

        if self._correlate:
            inner = inner.correlate(*self._correlate)

        inner = inner.alias()

        equivs = self.__all_equivs()

        context.adapter = sql_util.ColumnAdapter(inner, equivs)

        statement = sql.select(
                            [inner] + context.secondary_columns,
                            for_update=context.for_update,
                            use_labels=context.labels)

        from_clause = inner
        for eager_join in context.eager_joins.values():
            # EagerLoader places a 'stop_on' attribute on the join,
            # giving us a marker as to where the "splice point" of
            # the join should be
            from_clause = sql_util.splice_joins(
                                        from_clause,
                                        eager_join, eager_join.stop_on)

        statement.append_from(from_clause)

        if context.order_by:
            statement.append_order_by(
                *context.adapter.copy_and_process(
                    context.order_by
                )
            )

        statement.append_order_by(*context.eager_order_by)
        return statement

    def _simple_statement(self, context):
        if not context.order_by:
            context.order_by = None

        if self._distinct and context.order_by:
            order_by_col_expr = list(
                                    chain(*[
                                        sql_util.unwrap_order_by(o)
                                        for o in context.order_by
                                    ])
                                )
            context.primary_columns += order_by_col_expr

        context.froms += tuple(context.eager_joins.values())

        statement = sql.select(
                        context.primary_columns +
                                context.secondary_columns,
                        context.whereclause,
                        from_obj=context.froms,
                        use_labels=context.labels,
                        for_update=context.for_update,
                        order_by=context.order_by,
                        **self._select_args
                    )

        for hint in self._with_hints:
            statement = statement.with_hint(*hint)

        if self._correlate:
            statement = statement.correlate(*self._correlate)

        if context.eager_order_by:
            statement.append_order_by(*context.eager_order_by)
        return statement

    def _adjust_for_single_inheritance(self, context):
        """Apply single-table-inheritance filtering.

        For all distinct single-table-inheritance mappers represented in
        the columns clause of this query, add criterion to the WHERE
        clause of the given QueryContext such that only the appropriate
        subtypes are selected from the total results.

        """
        for (ext_info, adapter) in self._mapper_adapter_map.values():
            if ext_info in self._join_entities:
                continue
            single_crit = ext_info.mapper._single_table_criterion
            if single_crit is not None:
                if adapter:
                    single_crit = adapter.traverse(single_crit)
                single_crit = self._adapt_clause(single_crit, False, False)
                context.whereclause = sql.and_(context.whereclause,
                                            single_crit)

    def __str__(self):
        return str(self._compile_context().statement)

inspection._self_inspects(Query)


class _QueryEntity(object):
    """represent an entity column returned within a Query result."""

    def __new__(cls, *args, **kwargs):
        if cls is _QueryEntity:
            entity = args[1]
            if not isinstance(entity, basestring) and \
                        _is_mapped_class(entity):
                cls = _MapperEntity
            else:
                cls = _ColumnEntity
        return object.__new__(cls)

    def _clone(self):
        q = self.__class__.__new__(self.__class__)
        q.__dict__ = self.__dict__.copy()
        return q


class _MapperEntity(_QueryEntity):
    """mapper/class/AliasedClass entity"""

    def __init__(self, query, entity):
        self.primary_entity = not query._entities
        query._entities.append(self)

        self.entities = [entity]
        self.expr = entity

    def setup_entity(self, ext_info, aliased_adapter):
        self.mapper = ext_info.mapper
        self.aliased_adapter = aliased_adapter
        self.selectable = ext_info.selectable
        self.is_aliased_class = ext_info.is_aliased_class
        self._with_polymorphic = ext_info.with_polymorphic_mappers
        self._polymorphic_discriminator = \
                ext_info.polymorphic_on
        self.entity_zero = ext_info
        if ext_info.is_aliased_class:
            self._label_name = self.entity_zero.name
        else:
            self._label_name = self.mapper.class_.__name__
        self.path = self.entity_zero._path_registry

    def set_with_polymorphic(self, query, cls_or_mappers,
                                selectable, polymorphic_on):
        """Receive an update from a call to query.with_polymorphic().

        Note the newer style of using a free standing with_polymporphic()
        construct doesn't make use of this method.


        """
        if self.is_aliased_class:
            # TODO: invalidrequest ?
            raise NotImplementedError(
                        "Can't use with_polymorphic() against "
                        "an Aliased object"
                        )

        if cls_or_mappers is None:
            query._reset_polymorphic_adapter(self.mapper)
            return

        mappers, from_obj = self.mapper._with_polymorphic_args(
                                                cls_or_mappers, selectable)
        self._with_polymorphic = mappers
        self._polymorphic_discriminator = polymorphic_on

        self.selectable = from_obj
        query._mapper_loads_polymorphically_with(self.mapper,
                sql_util.ColumnAdapter(from_obj,
                        self.mapper._equivalent_columns))

    filter_fn = id

    @property
    def type(self):
        return self.mapper.class_

    @property
    def entity_zero_or_selectable(self):
        return self.entity_zero

    def corresponds_to(self, entity):
        if entity.is_aliased_class:
            if self.is_aliased_class:
                if entity._base_alias is self.entity_zero._base_alias:
                    return True
            return False
        elif self.is_aliased_class:
            if self.entity_zero._use_mapper_path:
                return entity in self._with_polymorphic
            else:
                return entity is self.entity_zero

        return entity.common_parent(self.entity_zero)

    def adapt_to_selectable(self, query, sel):
        query._entities.append(self)

    def _get_entity_clauses(self, query, context):

        adapter = None

        if not self.is_aliased_class:
            if query._polymorphic_adapters:
                adapter = query._polymorphic_adapters.get(self.mapper, None)
        else:
            adapter = self.aliased_adapter

        if adapter:
            if query._from_obj_alias:
                ret = adapter.wrap(query._from_obj_alias)
            else:
                ret = adapter
        else:
            ret = query._from_obj_alias

        return ret

    def row_processor(self, query, context, custom_rows):
        adapter = self._get_entity_clauses(query, context)

        if context.adapter and adapter:
            adapter = adapter.wrap(context.adapter)
        elif not adapter:
            adapter = context.adapter

        # polymorphic mappers which have concrete tables in
        # their hierarchy usually
        # require row aliasing unconditionally.
        if not adapter and self.mapper._requires_row_aliasing:
            adapter = sql_util.ColumnAdapter(
                self.selectable,
                self.mapper._equivalent_columns)

        if self.primary_entity:
            _instance = loading.instance_processor(
                self.mapper,
                context,
                self.path,
                adapter,
                only_load_props=query._only_load_props,
                refresh_state=context.refresh_state,
                polymorphic_discriminator=self._polymorphic_discriminator
            )
        else:
            _instance = loading.instance_processor(
                self.mapper,
                context,
                self.path,
                adapter,
                polymorphic_discriminator=self._polymorphic_discriminator
            )

        return _instance, self._label_name

    def setup_context(self, query, context):
        adapter = self._get_entity_clauses(query, context)

        context.froms += (self.selectable,)

        if context.order_by is False and self.mapper.order_by:
            context.order_by = self.mapper.order_by

            # apply adaptation to the mapper's order_by if needed.
            if adapter:
                context.order_by = adapter.adapt_list(
                                        util.to_list(
                                            context.order_by
                                        )
                                    )

        if self._with_polymorphic:
            poly_properties = self.mapper._iterate_polymorphic_properties(
                self._with_polymorphic)
        else:
            poly_properties = self.mapper._polymorphic_properties

        for value in poly_properties:
            if query._only_load_props and \
                    value.key not in query._only_load_props:
                continue
            value.setup(
                context,
                self,
                self.path,
                adapter,
                only_load_props=query._only_load_props,
                column_collection=context.primary_columns
            )

        if self._polymorphic_discriminator is not None and \
            self._polymorphic_discriminator \
                is not self.mapper.polymorphic_on:

            if adapter:
                pd = adapter.columns[self._polymorphic_discriminator]
            else:
                pd = self._polymorphic_discriminator
            context.primary_columns.append(pd)

    def __str__(self):
        return str(self.mapper)


class _ColumnEntity(_QueryEntity):
    """Column/expression based entity."""

    def __init__(self, query, column, namespace=None):
        self.expr = column
        self.namespace = namespace

        if isinstance(column, basestring):
            column = sql.literal_column(column)
            self._label_name = column.name
        elif isinstance(column, (
                                    attributes.QueryableAttribute,
                                    interfaces.PropComparator
                                )):
            self._label_name = column.key
            column = column.__clause_element__()
        else:
            self._label_name = getattr(column, 'key', None)

        if not isinstance(column, expression.ColumnElement) and \
                            hasattr(column, '_select_iterable'):
            for c in column._select_iterable:
                if c is column:
                    break
                _ColumnEntity(query, c, namespace=column)

            if c is not column:
                return

        if not isinstance(column, sql.ColumnElement):
            raise sa_exc.InvalidRequestError(
                "SQL expression, column, or mapped entity "
                "expected - got '%r'" % (column, )
            )

        type_ = column.type
        if type_.hashable:
            self.filter_fn = lambda item: item
        else:
            counter = util.counter()
            self.filter_fn = lambda item: counter()

        # If the Column is unnamed, give it a
        # label() so that mutable column expressions
        # can be located in the result even
        # if the expression's identity has been changed
        # due to adaption.

        if not column._label and not getattr(column, 'is_literal', False):
            column = column.label(self._label_name)

        query._entities.append(self)

        self.column = column
        self.froms = set()

        # look for ORM entities represented within the
        # given expression.  Try to count only entities
        # for columns whose FROM object is in the actual list
        # of FROMs for the overall expression - this helps
        # subqueries which were built from ORM constructs from
        # leaking out their entities into the main select construct
        self.actual_froms = actual_froms = set(column._from_objects)

        self.entities = util.OrderedSet(
            elem._annotations['parententity']
            for elem in visitors.iterate(column, {})
            if 'parententity' in elem._annotations
            and actual_froms.intersection(elem._from_objects)
            )

        if self.entities:
            self.entity_zero = list(self.entities)[0]
        elif self.namespace is not None:
            self.entity_zero = self.namespace
        else:
            self.entity_zero = None

    @property
    def entity_zero_or_selectable(self):
        if self.entity_zero is not None:
            return self.entity_zero
        elif self.actual_froms:
            return list(self.actual_froms)[0]
        else:
            return None

    @property
    def type(self):
        return self.column.type

    def adapt_to_selectable(self, query, sel):
        c = _ColumnEntity(query, sel.corresponding_column(self.column))
        c._label_name = self._label_name
        c.entity_zero = self.entity_zero
        c.entities = self.entities

    def setup_entity(self, ext_info, aliased_adapter):
        if 'selectable' not in self.__dict__:
            self.selectable = ext_info.selectable
        self.froms.add(ext_info.selectable)

    def corresponds_to(self, entity):
        if self.entity_zero is None:
            return False
        elif _is_aliased_class(entity):
            # TODO: polymorphic subclasses ?
            return entity is self.entity_zero
        else:
            return not _is_aliased_class(self.entity_zero) and \
                    entity.common_parent(self.entity_zero)

    def _resolve_expr_against_query_aliases(self, query, expr, context):
        return query._adapt_clause(expr, False, True)

    def row_processor(self, query, context, custom_rows):
        column = self._resolve_expr_against_query_aliases(
                                            query, self.column, context)

        if context.adapter:
            column = context.adapter.columns[column]

        def proc(row, result):
            return row[column]

        return proc, self._label_name

    def setup_context(self, query, context):
        column = self._resolve_expr_against_query_aliases(
                                            query, self.column, context)
        context.froms += tuple(self.froms)
        context.primary_columns.append(column)

    def __str__(self):
        return str(self.column)


log.class_logger(Query)


class QueryContext(object):
    multi_row_eager_loaders = False
    adapter = None
    froms = ()
    for_update = False

    def __init__(self, query):

        if query._statement is not None:
            if isinstance(query._statement, expression.SelectBase) and \
                                not query._statement.use_labels:
                self.statement = query._statement.apply_labels()
            else:
                self.statement = query._statement
        else:
            self.statement = None
            self.from_clause = query._from_obj
            self.whereclause = query._criterion
            self.order_by = query._order_by

        self.query = query
        self.session = query.session
        self.populate_existing = query._populate_existing
        self.invoke_all_eagers = query._invoke_all_eagers
        self.version_check = query._version_check
        self.refresh_state = query._refresh_state
        self.primary_columns = []
        self.secondary_columns = []
        self.eager_order_by = []
        self.eager_joins = {}
        self.create_eager_joins = []
        self.propagate_options = set(o for o in query._with_options if
                                        o.propagate_to_loaders)
        self.attributes = self._attributes = query._attributes.copy()


class AliasOption(interfaces.MapperOption):

    def __init__(self, alias):
        self.alias = alias

    def process_query(self, query):
        if isinstance(self.alias, basestring):
            alias = query._mapper_zero().mapped_table.alias(self.alias)
        else:
            alias = self.alias
        query._from_obj_alias = sql_util.ColumnAdapter(alias)