158 lines
5.6 KiB
Python
158 lines
5.6 KiB
Python
|
"""Bridges between the `asyncio` module and Tornado IOLoop.
|
||
|
|
||
|
This is a work in progress and interfaces are subject to change.
|
||
|
|
||
|
To test:
|
||
|
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
|
||
|
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
|
||
|
(the tests log a few warnings with AsyncIOMainLoop because they leave some
|
||
|
unfinished callbacks on the event loop that fail when it resumes)
|
||
|
"""
|
||
|
|
||
|
from __future__ import absolute_import, division, print_function, with_statement
|
||
|
import functools
|
||
|
|
||
|
import tornado.concurrent
|
||
|
from tornado.gen import convert_yielded
|
||
|
from tornado.ioloop import IOLoop
|
||
|
from tornado import stack_context
|
||
|
|
||
|
try:
|
||
|
# Import the real asyncio module for py33+ first. Older versions of the
|
||
|
# trollius backport also use this name.
|
||
|
import asyncio
|
||
|
except ImportError as e:
|
||
|
# Asyncio itself isn't available; see if trollius is (backport to py26+).
|
||
|
try:
|
||
|
import trollius as asyncio
|
||
|
except ImportError:
|
||
|
# Re-raise the original asyncio error, not the trollius one.
|
||
|
raise e
|
||
|
|
||
|
class BaseAsyncIOLoop(IOLoop):
|
||
|
def initialize(self, asyncio_loop, close_loop=False):
|
||
|
self.asyncio_loop = asyncio_loop
|
||
|
self.close_loop = close_loop
|
||
|
self.asyncio_loop.call_soon(self.make_current)
|
||
|
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
|
||
|
self.handlers = {}
|
||
|
# Set of fds listening for reads/writes
|
||
|
self.readers = set()
|
||
|
self.writers = set()
|
||
|
self.closing = False
|
||
|
|
||
|
def close(self, all_fds=False):
|
||
|
self.closing = True
|
||
|
for fd in list(self.handlers):
|
||
|
fileobj, handler_func = self.handlers[fd]
|
||
|
self.remove_handler(fd)
|
||
|
if all_fds:
|
||
|
self.close_fd(fileobj)
|
||
|
if self.close_loop:
|
||
|
self.asyncio_loop.close()
|
||
|
|
||
|
def add_handler(self, fd, handler, events):
|
||
|
fd, fileobj = self.split_fd(fd)
|
||
|
if fd in self.handlers:
|
||
|
raise ValueError("fd %s added twice" % fd)
|
||
|
self.handlers[fd] = (fileobj, stack_context.wrap(handler))
|
||
|
if events & IOLoop.READ:
|
||
|
self.asyncio_loop.add_reader(
|
||
|
fd, self._handle_events, fd, IOLoop.READ)
|
||
|
self.readers.add(fd)
|
||
|
if events & IOLoop.WRITE:
|
||
|
self.asyncio_loop.add_writer(
|
||
|
fd, self._handle_events, fd, IOLoop.WRITE)
|
||
|
self.writers.add(fd)
|
||
|
|
||
|
def update_handler(self, fd, events):
|
||
|
fd, fileobj = self.split_fd(fd)
|
||
|
if events & IOLoop.READ:
|
||
|
if fd not in self.readers:
|
||
|
self.asyncio_loop.add_reader(
|
||
|
fd, self._handle_events, fd, IOLoop.READ)
|
||
|
self.readers.add(fd)
|
||
|
else:
|
||
|
if fd in self.readers:
|
||
|
self.asyncio_loop.remove_reader(fd)
|
||
|
self.readers.remove(fd)
|
||
|
if events & IOLoop.WRITE:
|
||
|
if fd not in self.writers:
|
||
|
self.asyncio_loop.add_writer(
|
||
|
fd, self._handle_events, fd, IOLoop.WRITE)
|
||
|
self.writers.add(fd)
|
||
|
else:
|
||
|
if fd in self.writers:
|
||
|
self.asyncio_loop.remove_writer(fd)
|
||
|
self.writers.remove(fd)
|
||
|
|
||
|
def remove_handler(self, fd):
|
||
|
fd, fileobj = self.split_fd(fd)
|
||
|
if fd not in self.handlers:
|
||
|
return
|
||
|
if fd in self.readers:
|
||
|
self.asyncio_loop.remove_reader(fd)
|
||
|
self.readers.remove(fd)
|
||
|
if fd in self.writers:
|
||
|
self.asyncio_loop.remove_writer(fd)
|
||
|
self.writers.remove(fd)
|
||
|
del self.handlers[fd]
|
||
|
|
||
|
def _handle_events(self, fd, events):
|
||
|
fileobj, handler_func = self.handlers[fd]
|
||
|
handler_func(fileobj, events)
|
||
|
|
||
|
def start(self):
|
||
|
self._setup_logging()
|
||
|
self.asyncio_loop.run_forever()
|
||
|
|
||
|
def stop(self):
|
||
|
self.asyncio_loop.stop()
|
||
|
|
||
|
def call_at(self, when, callback, *args, **kwargs):
|
||
|
# asyncio.call_at supports *args but not **kwargs, so bind them here.
|
||
|
# We do not synchronize self.time and asyncio_loop.time, so
|
||
|
# convert from absolute to relative.
|
||
|
return self.asyncio_loop.call_later(
|
||
|
max(0, when - self.time()), self._run_callback,
|
||
|
functools.partial(stack_context.wrap(callback), *args, **kwargs))
|
||
|
|
||
|
def remove_timeout(self, timeout):
|
||
|
timeout.cancel()
|
||
|
|
||
|
def add_callback(self, callback, *args, **kwargs):
|
||
|
if self.closing:
|
||
|
raise RuntimeError("IOLoop is closing")
|
||
|
self.asyncio_loop.call_soon_threadsafe(
|
||
|
self._run_callback,
|
||
|
functools.partial(stack_context.wrap(callback), *args, **kwargs))
|
||
|
|
||
|
add_callback_from_signal = add_callback
|
||
|
|
||
|
|
||
|
class AsyncIOMainLoop(BaseAsyncIOLoop):
|
||
|
def initialize(self):
|
||
|
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
|
||
|
close_loop=False)
|
||
|
|
||
|
|
||
|
class AsyncIOLoop(BaseAsyncIOLoop):
|
||
|
def initialize(self):
|
||
|
super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
|
||
|
close_loop=True)
|
||
|
|
||
|
def to_tornado_future(asyncio_future):
|
||
|
"""Convert an ``asyncio.Future`` to a `tornado.concurrent.Future`."""
|
||
|
tf = tornado.concurrent.Future()
|
||
|
tornado.concurrent.chain_future(asyncio_future, tf)
|
||
|
return tf
|
||
|
|
||
|
def to_asyncio_future(tornado_future):
|
||
|
"""Convert a `tornado.concurrent.Future` to an ``asyncio.Future``."""
|
||
|
af = asyncio.Future()
|
||
|
tornado.concurrent.chain_future(tornado_future, af)
|
||
|
return af
|
||
|
|
||
|
if hasattr(convert_yielded, 'register'):
|
||
|
convert_yielded.register(asyncio.Future, to_tornado_future)
|