Compare commits

..

No commits in common. "master" and "0.1.0" have entirely different histories.

33 changed files with 633 additions and 2536 deletions

View File

@ -1,55 +0,0 @@
# adapted from https://packaging.python.org/en/latest/appveyor/
environment:
# we tell Tox to use "twisted[windows]", to get pypiwin32 installed
#TWISTED_EXTRAS: "[windows]"
# that didn't work (it seems to work when I run it locally, but on appveyor
# it fails to install the pypiwin32 package). So don't bother telling
# Twisted to support windows: just install it ourselves.
# EXTRA_DEPENDENCY: "pypiwin32"
matrix:
# For Python versions available on Appveyor, see
# http://www.appveyor.com/docs/installed-software#python
- PYTHON: "C:\\Python27"
- PYTHON: "C:\\Python27-x64"
DISTUTILS_USE_SDK: "1"
- PYTHON: "C:\\Python35"
- PYTHON: "C:\\Python36"
- PYTHON: "C:\\Python36-x64"
install:
- |
%PYTHON%\python.exe -m pip install wheel tox
# note:
# %PYTHON% has: python.exe
# %PYTHON%\Scripts has: pip.exe, tox.exe (and others installed by bare pip)
build: off
test_script:
# Put your test command here.
# Note that you must use the environment variable %PYTHON% to refer to
# the interpreter you're using - Appveyor does not do anything special
# to put the Python evrsion you want to use on PATH.
- |
misc\windows-build.cmd %PYTHON%\Scripts\tox.exe -e py
after_test:
# This step builds your wheels.
# Again, you only need build.cmd if you're building C extensions for
# 64-bit Python 3.3/3.4. And you need to use %PYTHON% to get the correct
# interpreter
- |
misc\windows-build.cmd %PYTHON%\python.exe setup.py bdist_wheel
artifacts:
# bdist_wheel puts your built wheel in the dist directory
- path: dist\*
#on_success:
# You can use this step to upload your artifacts to a public website.
# See Appveyor's documentation for more details. Or you can simply
# access your wheels from the Appveyor "artifacts" tab for your build.

View File

@ -1,35 +0,0 @@
name: Tests
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
testing:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip tox codecov
tox --notest -e coverage
- name: Test
run: |
python --version
tox -e coverage
- name: Upload Coverage
run: codecov

View File

@ -1,17 +1,18 @@
arch:
- amd64
- ppc64le
language: python
# defaults: the py3.7 environment overrides these
dist: trusty
sudo: false sudo: false
language: python
cache: pip cache: pip
before_cache: before_cache:
- rm -f $HOME/.cache/pip/log/debug.log - rm -f $HOME/.cache/pip/log/debug.log
branches: branches:
except: except:
- /^WIP-.*$/ - /^WIP-.*$/
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "3.6"
- "nightly"
install: install:
- pip install -U pip tox virtualenv codecov - pip install -U pip tox virtualenv codecov
before_script: before_script:
@ -24,15 +25,6 @@ script:
after_success: after_success:
- codecov - codecov
matrix: matrix:
include:
- python: 2.7
- python: 3.5
- python: 3.6
- python: 3.7
# we don't actually need sudo, but that kicks us onto GCE, which lets
# us get xenial
sudo: true
dist: xenial
- python: nightly
allow_failures: allow_failures:
- python: nightly - python: "3.3"
- python: "nightly"

32
NEWS.md
View File

@ -1,37 +1,5 @@
User-visible changes in "magic-wormhole-transit-relay": User-visible changes in "magic-wormhole-transit-relay":
## unreleased
* drop Python 2, Python 3.5 and 3.6 support
## Release 0.2.1 (11-Sep-2019)
* listen on IPv4+IPv6 properly (#12)
## Release 0.2.0 (10-Sep-2019)
* listen on IPv4+IPv6 socket by default (#12)
* enable SO_KEEPALIVE on all connections (#9)
* drop support for py3.3 and py3.4
* improve munin plugins
## Release 0.1.2 (19-Mar-2018)
* Allow more simultaneous connections, by increasing the rlimits() ceiling at
startup
* Improve munin plugins
* Get tests working on Windows
## Release 0.1.1 (14-Feb-2018)
Improve logging and munin graphing tools: previous version would count bad
handshakes twice (once as "errory", and again as "lonely"). The munin plugins
have been renamed.
## Release 0.1.0 (12-Nov-2017) ## Release 0.1.0 (12-Nov-2017)
Initial release. Forked from magic-wormhole-0.10.3 (12-Sep-2017). Initial release. Forked from magic-wormhole-0.10.3 (12-Sep-2017).

View File

@ -1,8 +1,7 @@
# magic-wormhole-transit-relay # magic-wormhole-transit-relay
[![PyPI](http://img.shields.io/pypi/v/magic-wormhole-transit-relay.svg)](https://pypi.python.org/pypi/magic-wormhole-transit-relay) [![Build Status](https://travis-ci.org/warner/magic-wormhole-transit-relay.svg?branch=master)](https://travis-ci.org/warner/magic-wormhole-transit-relay)
![Tests](https://github.com/magic-wormhole/magic-wormhole-transit-relay/workflows/Tests/badge.svg) [![codecov.io](https://codecov.io/github/warner/magic-wormhole-transit-relay/coverage.svg?branch=master)](https://codecov.io/github/warner/magic-wormhole-transit-relay?branch=master)
[![codecov.io](https://codecov.io/github/magic-wormhole/magic-wormhole-transit-relay/coverage.svg?branch=master)](https://codecov.io/github/magic-wormhole/magic-wormhole-transit-relay?branch=master)
Transit Relay server for Magic-Wormhole Transit Relay server for Magic-Wormhole

View File

@ -1,54 +0,0 @@
"""
This is a test-client for the transit-relay that uses TCP. It
doesn't send any data, only prints out data that is received. Uses a
fixed token of 64 'a' characters. Always connects on localhost:4001
"""
from twisted.internet import endpoints
from twisted.internet.defer import (
Deferred,
)
from twisted.internet.task import react
from twisted.internet.error import (
ConnectionDone,
)
from twisted.internet.protocol import (
Protocol,
Factory,
)
class RelayEchoClient(Protocol):
"""
Speaks the version1 magic wormhole transit relay protocol (as a client)
"""
def connectionMade(self):
print(">CONNECT")
self.data = b""
self.transport.write(u"please relay {}\n".format(self.factory.token).encode("ascii"))
def dataReceived(self, data):
print(">RECV {} bytes".format(len(data)))
print(data.decode("ascii"))
self.data += data
if data == "ok\n":
self.transport.write("ding\n")
def connectionLost(self, reason):
if isinstance(reason.value, ConnectionDone):
self.factory.done.callback(None)
else:
print(">DISCONNCT: {}".format(reason))
self.factory.done.callback(reason)
@react
def main(reactor):
ep = endpoints.clientFromString(reactor, "tcp:localhost:4001")
f = Factory.forProtocol(RelayEchoClient)
f.token = "a" * 64
f.done = Deferred()
ep.connect(f)
return f.done

View File

@ -48,43 +48,13 @@ The relevant arguments are:
* ``--port=``: the endpoint to listen on, like ``tcp:4001`` * ``--port=``: the endpoint to listen on, like ``tcp:4001``
* ``--log-fd=``: writes JSON lines to the given file descriptor for each connection * ``--log-fd=``: writes JSON lines to the given file descriptor for each connection
* ``--usage-db=``: maintains a SQLite database with current and historical usage data * ``--usage-db=``: maintains a SQLite database with current and historical usage data
* ``--blur-usage=``: round logged timestamps and data sizes * ``--blur-usage=``: if provided, logs are rounded to the given number of
seconds, and data sizes are rounded too
For WebSockets support, two additional arguments:
* ``--websocket``: the endpoint to listen for websocket connections
on, like ``tcp:4002``
* ``--websocket-url``: the URL of the WebSocket connection. This may
be different from the listening endpoint because of port-forwarding
and so forth. By default it will be ``ws://localhost:<port>`` if not
provided
When you use ``twist``, the relay runs in the foreground, so it will When you use ``twist``, the relay runs in the foreground, so it will
generally exit as soon as the controlling terminal exits. For persistent generally exit as soon as the controlling terminal exits. For persistent
environments, you should daemonize the server. environments, you should daemonize the server.
## Minimizing Log Data
The server code attempts to strike a balance between minimizing data
collected about users, and recording enough information to manage the server
and monitor its operation. The standard `twistd.log` file does not record IP
addresses unless an error occurs. The optional `--log-fd=` file (and the
SQLite database generated if `--usage-db=` is enabled) record the time at
which the first side connected, the time until the second side connected, the
total transfer time, the total number of bytes transferred, and the
success/failure status (the "mood").
If `--blur-usage=` is provided, these recorded file sizes are rounded down:
sizes less than 1kB are recorded as 0, sizes up to 1MB are rounded to the
nearest kB, sizes up to 1GB are rounded to the nearest MB, and sizes above
1GB are rounded to the nearest 100MB.
The argument to `--blur-usage=` is treated as a number of seconds, and the
"first side connects" timestamp is rounded to a multiple of this. For
example, `--blur-usage=3600` means all timestamps are rounded down to the
nearest hour. The waiting time and total time deltas are recorded without
rounding.
## Daemonization ## Daemonization
A production installation will want to daemonize the server somehow. One A production installation will want to daemonize the server somehow. One
@ -131,10 +101,6 @@ management can use the ``--log-fd=`` option to emit logs, then route those
logs into a suitable analysis tool. Other environments might be content to logs into a suitable analysis tool. Other environments might be content to
use ``--usage-db=`` and run the included Munin plugins to monitor usage. use ``--usage-db=`` and run the included Munin plugins to monitor usage.
There is also a
[Dockerfile](https://github.com/ggeorgovassilis/magic-wormhole-transit-relay-docker),
written by George Georgovassilis, which you might find useful.
## Configuring Clients ## Configuring Clients
The transit relay will listen on an "endpoint" (usually a TCP port, but it The transit relay will listen on an "endpoint" (usually a TCP port, but it

View File

@ -18,6 +18,7 @@ The resuting "usage.sqlite" should be passed into --usage-db=, e.g. "twist
transitrelay --usage=.../PATH/TO/usage.sqlite". transitrelay --usage=.../PATH/TO/usage.sqlite".
""" """
from __future__ import unicode_literals, print_function
import sys import sys
from wormhole_transit_relay.database import open_existing_db, create_db from wormhole_transit_relay.database import open_existing_db, create_db

View File

@ -7,18 +7,16 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.usagedb /path/to/your/wormhole/server/usage.sqlite env.usagedb /path/to/your/wormhole/server/usage.sqlite
""" """
from __future__ import print_function
import os, sys, time, sqlite3 import os, sys, time, sqlite3
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Transit Usage (since reboot) graph_title Magic-Wormhole Transit Usage (since reboot)
graph_vlabel Bytes Since Reboot graph_vlabel Bytes Since Reboot
graph_category wormhole graph_category network
bytes.label Transit Bytes (complete) bytes.label Transit Bytes
bytes.draw LINE1 bytes.draw LINE1
bytes.type GAUGE bytes.type GAUGE
incomplete.label Transit Bytes (incomplete)
incomplete.draw LINE1
incomplete.type GAUGE
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
@ -30,12 +28,10 @@ assert os.path.exists(dbfile)
db = sqlite3.connect(dbfile) db = sqlite3.connect(dbfile)
MINUTE = 60.0 MINUTE = 60.0
updated,rebooted,incomplete = db.execute("SELECT `updated`,`rebooted`,`incomplete_bytes` FROM `current`").fetchone() updated,rebooted = db.execute("SELECT `updated`,`rebooted` FROM `current`").fetchone()
if time.time() > updated + 5*MINUTE: if time.time() > updated + 5*MINUTE:
sys.exit(1) # expired sys.exit(1) # expired
complete = db.execute("SELECT SUM(`total_bytes`) FROM `usage`" value = db.execute("SELECT SUM(`total_bytes`) FROM `usage` WHERE `started` > ?",
" WHERE `started` > ?", (rebooted,)).fetchone()[0]
(rebooted,)).fetchone()[0] or 0 print("bytes.value", value)
print("bytes.value", complete)
print("incomplete.value", complete+incomplete)

View File

@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.usagedb /path/to/your/wormhole/server/usage.sqlite env.usagedb /path/to/your/wormhole/server/usage.sqlite
""" """
from __future__ import print_function
import os, sys, time, sqlite3 import os, sys, time, sqlite3
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Transit Active Channels graph_title Magic-Wormhole Transit Active Channels
graph_vlabel Channels graph_vlabel Channels
graph_category wormhole graph_category network
waiting.label Transit Waiting waiting.label Transit Waiting
waiting.draw LINE1 waiting.draw LINE1
waiting.type GAUGE waiting.type GAUGE

View File

@ -7,18 +7,16 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.usagedb /path/to/your/wormhole/server/usage.sqlite env.usagedb /path/to/your/wormhole/server/usage.sqlite
""" """
from __future__ import print_function
import os, sys, time, sqlite3 import os, sys, time, sqlite3
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Transit Usage (all time) graph_title Magic-Wormhole Transit Usage (all time)
graph_vlabel Bytes Since DB Creation graph_vlabel Bytes Since DB Creation
graph_category wormhole graph_category network
bytes.label Transit Bytes (complete) bytes.label Transit Bytes
bytes.draw LINE1 bytes.draw LINE1
bytes.type GAUGE bytes.type GAUGE
incomplete.label Transit Bytes (incomplete)
incomplete.draw LINE1
incomplete.type GAUGE
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
@ -30,12 +28,9 @@ assert os.path.exists(dbfile)
db = sqlite3.connect(dbfile) db = sqlite3.connect(dbfile)
MINUTE = 60.0 MINUTE = 60.0
updated,incomplete = db.execute("SELECT `updated`,`incomplete_bytes`" updated = db.execute("SELECT `updated` FROM `current`").fetchone()[0]
" FROM `current`").fetchone()
if time.time() > updated + 5*MINUTE: if time.time() > updated + 5*MINUTE:
sys.exit(1) # expired sys.exit(1) # expired
complete = db.execute("SELECT SUM(`total_bytes`)" value = db.execute("SELECT SUM(`total_bytes`) FROM `usage`").fetchone()[0]
" FROM `usage`").fetchone()[0] or 0 print("bytes.value", value)
print("bytes.value", complete)
print("incomplete.value", complete+incomplete)

View File

@ -7,24 +7,19 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.usagedb /path/to/your/wormhole/server/usage.sqlite env.usagedb /path/to/your/wormhole/server/usage.sqlite
""" """
from __future__ import print_function
import os, sys, time, sqlite3 import os, sys, time, sqlite3
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Transit Server Events (since reboot) graph_title Magic-Wormhole Transit Server Errors (since reboot)
graph_vlabel Events Since Reboot graph_vlabel Events Since Reboot
graph_category wormhole graph_category network
happy.label Happy
happy.draw LINE1
happy.type GAUGE
errory.label Errory errory.label Errory
errory.draw LINE1 errory.draw LINE1
errory.type GAUGE errory.type GAUGE
lonely.label Lonely lonely.label Lonely
lonely.draw LINE1 lonely.draw LINE1
lonely.type GAUGE lonely.type GAUGE
redundant.label Redundant
redundant.draw LINE1
redundant.type GAUGE
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
@ -40,13 +35,6 @@ rebooted,updated = db.execute("SELECT `rebooted`, `updated` FROM `current`").fet
if time.time() > updated + 5*MINUTE: if time.time() > updated + 5*MINUTE:
sys.exit(1) # expired sys.exit(1) # expired
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE"
" `started` > ? AND"
" `result` = 'happy'",
(rebooted,)).fetchone()[0]
print("happy.value", count)
count = db.execute("SELECT COUNT() FROM `usage`" count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE" " WHERE"
" `started` > ? AND" " `started` > ? AND"
@ -60,10 +48,3 @@ count = db.execute("SELECT COUNT() FROM `usage`"
" `result` = 'lonely'", " `result` = 'lonely'",
(rebooted,)).fetchone()[0] (rebooted,)).fetchone()[0]
print("lonely.value", count) print("lonely.value", count)
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE"
" `started` > ? AND"
" `result` = 'redundant'",
(rebooted,)).fetchone()[0]
print("redundant.value", count)

View File

@ -1,61 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.usagedb /path/to/your/wormhole/server/usage.sqlite
"""
import os, sys, time, sqlite3
CONFIG = """\
graph_title Magic-Wormhole Transit Server Events (all time)
graph_vlabel Events
graph_category wormhole
happy.label Happy
happy.draw LINE1
happy.type GAUGE
errory.label Errory
errory.draw LINE1
errory.type GAUGE
lonely.label Lonely
lonely.draw LINE1
lonely.type GAUGE
redundant.label Redundant
redundant.draw LINE1
redundant.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print(CONFIG.rstrip())
sys.exit(0)
dbfile = os.environ["usagedb"]
assert os.path.exists(dbfile)
db = sqlite3.connect(dbfile)
MINUTE = 60.0
rebooted,updated = db.execute("SELECT `rebooted`, `updated` FROM `current`").fetchone()
if time.time() > updated + 5*MINUTE:
sys.exit(1) # expired
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE `result` = 'happy'",
).fetchone()[0]
print("happy.value", count)
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE `result` = 'errory'",
).fetchone()[0]
print("errory.value", count)
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE `result` = 'lonely'",
).fetchone()[0]
print("lonely.value", count)
count = db.execute("SELECT COUNT() FROM `usage`"
" WHERE `result` = 'redundant'",
).fetchone()[0]
print("redundant.value", count)

View File

@ -1,21 +0,0 @@
@echo off
:: To build extensions for 64 bit Python 3, we need to configure environment
:: variables to use the MSVC 2010 C++ compilers from GRMSDKX_EN_DVD.iso of:
:: MS Windows SDK for Windows 7 and .NET Framework 4
::
:: More details at:
:: https://github.com/cython/cython/wiki/64BitCythonExtensionsOnWindows
IF "%DISTUTILS_USE_SDK%"=="1" (
ECHO Configuring environment to build with MSVC on a 64bit architecture
ECHO Using Windows SDK 7.1
"C:\Program Files\Microsoft SDKs\Windows\v7.1\Setup\WindowsSdkVer.exe" -q -version:v7.1
CALL "C:\Program Files\Microsoft SDKs\Windows\v7.1\Bin\SetEnv.cmd" /x64 /release
SET MSSdk=1
REM Need the following to allow tox to see the SDK compiler
SET TOX_TESTENV_PASSENV=DISTUTILS_USE_SDK MSSdk INCLUDE LIB
) ELSE (
ECHO Using default MSVC build environment
)
CALL %*

View File

@ -18,8 +18,7 @@ setup(name="magic-wormhole-transit-relay",
], ],
package_data={"wormhole_transit_relay": ["db-schemas/*.sql"]}, package_data={"wormhole_transit_relay": ["db-schemas/*.sql"]},
install_requires=[ install_requires=[
"twisted >= 21.2.0", "twisted >= 17.5.0",
"autobahn >= 21.3.1",
], ],
extras_require={ extras_require={
':sys_platform=="win32"': ["pypiwin32"], ':sys_platform=="win32"': ["pypiwin32"],

View File

@ -495,7 +495,7 @@ def get_versions():
# versionfile_source is the relative path from the top of the source # versionfile_source is the relative path from the top of the source
# tree (where the .git directory might live) to this file. Invert # tree (where the .git directory might live) to this file. Invert
# this to find the root from __file__. # this to find the root from __file__.
for _ in cfg.versionfile_source.split('/'): for i in cfg.versionfile_source.split('/'):
root = os.path.dirname(root) root = os.path.dirname(root)
except NameError: except NameError:
return {"version": "0+unknown", "full-revisionid": None, return {"version": "0+unknown", "full-revisionid": None,

View File

@ -1,3 +1,4 @@
from __future__ import unicode_literals
import os import os
import sqlite3 import sqlite3
import tempfile import tempfile

View File

@ -24,7 +24,6 @@ CREATE TABLE `usage`
-- transit moods: -- transit moods:
-- "errory": one side gave the wrong handshake -- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up -- "lonely": good handshake, but the other side never showed up
-- "redundant": good handshake, abandoned in favor of different connection
-- "happy": both sides gave correct handshake -- "happy": both sides gave correct handshake
); );
CREATE INDEX `usage_started_index` ON `usage` (`started`); CREATE INDEX `usage_started_index` ON `usage` (`started`);

View File

@ -1,35 +0,0 @@
try:
# 'resource' is unix-only
from resource import getrlimit, setrlimit, RLIMIT_NOFILE
except ImportError: # pragma: nocover
getrlimit, setrlimit, RLIMIT_NOFILE = None, None, None # pragma: nocover
from twisted.python import log
def increase_rlimits():
if getrlimit is None:
log.msg("unable to import 'resource', leaving rlimit alone")
return
soft, hard = getrlimit(RLIMIT_NOFILE)
if soft >= 10000:
log.msg("RLIMIT_NOFILE.soft was %d, leaving it alone" % soft)
return
# OS-X defaults to soft=7168, and reports a huge number for 'hard',
# but won't accept anything more than soft=10240, so we can't just
# set soft=hard. Linux returns (1024, 1048576) and is fine with
# soft=hard. Cygwin is reported to return (256,-1) and accepts up to
# soft=3200. So we try multiple values until something works.
for newlimit in [hard, 10000, 3200, 1024]:
log.msg("changing RLIMIT_NOFILE from (%s,%s) to (%s,%s)" %
(soft, hard, newlimit, hard))
try:
setrlimit(RLIMIT_NOFILE, (newlimit, hard))
log.msg("setrlimit successful")
return
except ValueError as e:
log.msg("error during setrlimit: %s" % e)
continue
except:
log.msg("other error during setrlimit, leaving it alone")
log.err()
return
log.msg("unable to change rlimit, leaving it alone")

View File

@ -1,477 +0,0 @@
from collections import defaultdict
import automat
from twisted.python import log
from zope.interface import (
Interface,
Attribute,
)
class ITransitClient(Interface):
"""
Represents the client side of a connection to this transit
relay. This is used by TransitServerState instances.
"""
started_time = Attribute("timestamp when the connection was established")
def send(data):
"""
Send some byets to the client
"""
def disconnect():
"""
Disconnect the client transport
"""
def connect_partner(other):
"""
Hook up to our partner.
:param ITransitClient other: our partner
"""
def disconnect_partner():
"""
Disconnect our partner's transport
"""
class ActiveConnections(object):
"""
Tracks active connections.
A connection is 'active' when both sides have shown up and they
are glued together (and thus could be passing data back and forth
if any is flowing).
"""
def __init__(self):
self._connections = set()
def register(self, side0, side1):
"""
A connection has become active so register both its sides
:param TransitConnection side0: one side of the connection
:param TransitConnection side1: one side of the connection
"""
self._connections.add(side0)
self._connections.add(side1)
def unregister(self, side):
"""
One side of a connection has become inactive.
:param TransitConnection side: an inactive side of a connection
"""
self._connections.discard(side)
class PendingRequests(object):
"""
Tracks outstanding (non-"active") requests.
We register client connections against the tokens we have
received. When the other side shows up we can thus match it to the
correct partner connection. At this point, the connection becomes
"active" is and is thus no longer "pending" and so will no longer
be in this collection.
"""
def __init__(self, active_connections):
"""
:param active_connections: an instance of ActiveConnections where
connections are put when both sides arrive.
"""
self._requests = defaultdict(set) # token -> set((side, TransitConnection))
self._active = active_connections
def unregister(self, token, side, tc):
"""
We no longer care about a particular client (e.g. it has
disconnected).
"""
if token in self._requests:
self._requests[token].discard((side, tc))
if not self._requests[token]:
# no more sides; token is dead
del self._requests[token]
self._active.unregister(tc)
def register(self, token, new_side, new_tc):
"""
A client has connected and successfully offered a token (and
optional 'side' token). If this is the first one for this
token, we merely remember it. If it is the second side for
this token we connect them together.
:param bytes token: the token for this connection.
:param bytes new_side: None or the side token for this connection
:param TransitServerState new_tc: the state-machine of the connection
:returns bool: True if we are the first side to register this
token
"""
potentials = self._requests[token]
for old in potentials:
(old_side, old_tc) = old
if ((old_side is None)
or (new_side is None)
or (old_side != new_side)):
# we found a match
# drop and stop tracking the rest
potentials.remove(old)
for (_, leftover_tc) in potentials.copy():
# Don't record this as errory. It's just a spare connection
# from the same side as a connection that got used. This
# can happen if the connection hint contains multiple
# addresses (we don't currently support those, but it'd
# probably be useful in the future).
leftover_tc.partner_connection_lost()
self._requests.pop(token, None)
# glue the two ends together
self._active.register(new_tc, old_tc)
new_tc.got_partner(old_tc)
old_tc.got_partner(new_tc)
return False
potentials.add((new_side, new_tc))
return True
# TODO: timer
class TransitServerState(object):
"""
Encapsulates the state-machine of the server side of a transit
relay connection.
Once the protocol has been told to relay (or to relay for a side)
it starts passing all received bytes to the other side until it
closes.
"""
_machine = automat.MethodicalMachine()
_client = None
_buddy = None
_token = None
_side = None
_first = None
_mood = "empty"
_total_sent = 0
def __init__(self, pending_requests, usage_recorder):
self._pending_requests = pending_requests
self._usage = usage_recorder
def get_token(self):
"""
:returns str: a string describing our token. This will be "-" if
we have no token yet, or "{16 chars}-<unsided>" if we have
just a token or "{16 chars}-{16 chars}" if we have a token and
a side.
"""
d = "-"
if self._token is not None:
d = self._token[:16].decode("ascii")
if self._side is not None:
d += "-" + self._side.decode("ascii")
else:
d += "-<unsided>"
return d
@_machine.input()
def connection_made(self, client):
"""
A client has connected. May only be called once.
:param ITransitClient client: our client.
"""
# NB: the "only called once" is enforced by the state-machine;
# this input is only valid for the "listening" state, to which
# we never return.
@_machine.input()
def please_relay(self, token):
"""
A 'please relay X' message has been received (the original version
of the protocol).
"""
@_machine.input()
def please_relay_for_side(self, token, side):
"""
A 'please relay X for side Y' message has been received (the
second version of the protocol).
"""
@_machine.input()
def bad_token(self):
"""
A bad token / relay line was received (e.g. couldn't be parsed)
"""
@_machine.input()
def got_partner(self, client):
"""
The partner for this relay session has been found
"""
@_machine.input()
def connection_lost(self):
"""
Our transport has failed.
"""
@_machine.input()
def partner_connection_lost(self):
"""
Our partner's transport has failed.
"""
@_machine.input()
def got_bytes(self, data):
"""
Some bytes have arrived (that aren't part of the handshake)
"""
@_machine.output()
def _remember_client(self, client):
self._client = client
# note that there is no corresponding "_forget_client" because we
# may still want to access it after it is gone .. for example, to
# get the .started_time for logging purposes
@_machine.output()
def _register_token(self, token):
return self._real_register_token_for_side(token, None)
@_machine.output()
def _register_token_for_side(self, token, side):
return self._real_register_token_for_side(token, side)
@_machine.output()
def _unregister(self):
"""
remove us from the thing that remembers tokens and sides
"""
return self._pending_requests.unregister(self._token, self._side, self)
@_machine.output()
def _send_bad(self):
self._mood = "errory"
self._client.send(b"bad handshake\n")
if self._client.factory.log_requests:
log.msg("transit handshake failure")
@_machine.output()
def _send_ok(self):
self._client.send(b"ok\n")
@_machine.output()
def _send_impatient(self):
self._client.send(b"impatient\n")
if self._client.factory.log_requests:
log.msg("transit impatience failure")
@_machine.output()
def _count_bytes(self, data):
self._total_sent += len(data)
@_machine.output()
def _send_to_partner(self, data):
self._buddy._client.send(data)
@_machine.output()
def _connect_partner(self, client):
self._buddy = client
self._client.connect_partner(client)
@_machine.output()
def _disconnect(self):
self._client.disconnect()
@_machine.output()
def _disconnect_partner(self):
self._client.disconnect_partner()
# some outputs to record "usage" information ..
@_machine.output()
def _record_usage(self):
if self._mood == "jilted":
if self._buddy and self._buddy._mood == "happy":
return
self._usage.record(
started=self._client.started_time,
buddy_started=self._buddy._client.started_time if self._buddy is not None else None,
result=self._mood,
bytes_sent=self._total_sent,
buddy_bytes=self._buddy._total_sent if self._buddy is not None else None
)
# some outputs to record the "mood" ..
@_machine.output()
def _mood_happy(self):
self._mood = "happy"
@_machine.output()
def _mood_lonely(self):
self._mood = "lonely"
@_machine.output()
def _mood_redundant(self):
self._mood = "redundant"
@_machine.output()
def _mood_impatient(self):
self._mood = "impatient"
@_machine.output()
def _mood_errory(self):
self._mood = "errory"
@_machine.output()
def _mood_happy_if_first(self):
"""
We disconnected first so we're only happy if we also connected
first.
"""
if self._first:
self._mood = "happy"
else:
self._mood = "jilted"
def _real_register_token_for_side(self, token, side):
"""
A client has connected and sent a valid version 1 or version 2
handshake. If the former, `side` will be None.
In either case, we remember the tokens and register
ourselves. This might result in 'got_partner' notifications to
two state-machines if this is the second side for a given token.
:param bytes token: the token
:param bytes side: The side token (or None)
"""
self._token = token
self._side = side
self._first = self._pending_requests.register(token, side, self)
@_machine.state(initial=True)
def listening(self):
"""
Initial state, awaiting connection.
"""
@_machine.state()
def wait_relay(self):
"""
Waiting for a 'relay' message
"""
@_machine.state()
def wait_partner(self):
"""
Waiting for our partner to connect
"""
@_machine.state()
def relaying(self):
"""
Relaying bytes to our partner
"""
@_machine.state()
def done(self):
"""
Terminal state
"""
listening.upon(
connection_made,
enter=wait_relay,
outputs=[_remember_client],
)
listening.upon(
connection_lost,
enter=done,
outputs=[_mood_errory],
)
wait_relay.upon(
please_relay,
enter=wait_partner,
outputs=[_mood_lonely, _register_token],
)
wait_relay.upon(
please_relay_for_side,
enter=wait_partner,
outputs=[_mood_lonely, _register_token_for_side],
)
wait_relay.upon(
bad_token,
enter=done,
outputs=[_mood_errory, _send_bad, _disconnect, _record_usage],
)
wait_relay.upon(
got_bytes,
enter=done,
outputs=[_count_bytes, _mood_errory, _disconnect, _record_usage],
)
wait_relay.upon(
connection_lost,
enter=done,
outputs=[_disconnect, _record_usage],
)
wait_partner.upon(
got_partner,
enter=relaying,
outputs=[_mood_happy, _send_ok, _connect_partner],
)
wait_partner.upon(
connection_lost,
enter=done,
outputs=[_mood_lonely, _unregister, _record_usage],
)
wait_partner.upon(
got_bytes,
enter=done,
outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage],
)
wait_partner.upon(
partner_connection_lost,
enter=done,
outputs=[_mood_redundant, _disconnect, _record_usage],
)
relaying.upon(
got_bytes,
enter=relaying,
outputs=[_count_bytes, _send_to_partner],
)
relaying.upon(
connection_lost,
enter=done,
outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage],
)
done.upon(
connection_lost,
enter=done,
outputs=[],
)
done.upon(
partner_connection_lost,
enter=done,
outputs=[],
)
# uncomment to turn on state-machine tracing
# set_trace_function = _machine._setTrace

View File

@ -1,18 +1,11 @@
import os import os
from . import transit_server
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python import usage from twisted.python import usage
from twisted.application.service import MultiService from twisted.application.service import MultiService
from twisted.application.internet import (TimerService, from twisted.application.internet import (TimerService,
StreamServerEndpointService) StreamServerEndpointService)
from twisted.internet import endpoints from twisted.internet import endpoints
from twisted.internet import protocol
from autobahn.twisted.websocket import WebSocketServerFactory
from . import transit_server
from .usage import create_usage_tracker
from .increase_rlimits import increase_rlimits
from .database import get_db
LONGDESC = """\ LONGDESC = """\
This plugin sets up a 'Transit Relay' server for magic-wormhole. This service This plugin sets up a 'Transit Relay' server for magic-wormhole. This service
@ -25,9 +18,7 @@ class Options(usage.Options):
longdesc = LONGDESC longdesc = LONGDESC
optParameters = [ optParameters = [
("port", "p", "tcp:4001:interface=\:\:", "endpoint to listen on"), ("port", "p", "tcp:4001", "endpoint to listen on"),
("websocket", "w", None, "endpoint to listen for WebSocket connections"),
("websocket-url", "u", None, "WebSocket URL (derived from endpoint if not provided)"),
("blur-usage", None, None, "blur timestamps and data sizes in logs"), ("blur-usage", None, None, "blur timestamps and data sizes in logs"),
("log-fd", None, None, "write JSON usage logs to this file descriptor"), ("log-fd", None, None, "write JSON usage logs to this file descriptor"),
("usage-db", None, None, "record usage data (SQLite)"), ("usage-db", None, None, "record usage data (SQLite)"),
@ -38,46 +29,14 @@ class Options(usage.Options):
def makeService(config, reactor=reactor): def makeService(config, reactor=reactor):
increase_rlimits() ep = endpoints.serverFromString(reactor, config["port"]) # to listen
tcp_ep = endpoints.serverFromString(reactor, config["port"]) # to listen log_file = (os.fdopen(int(config["log-fd"]), "w")
ws_ep = ( if config["log-fd"] is not None
endpoints.serverFromString(reactor, config["websocket"]) else None)
if config["websocket"] is not None f = transit_server.Transit(blur_usage=config["blur-usage"],
else None log_file=log_file,
) usage_db=config["usage-db"])
log_file = (
os.fdopen(int(config["log-fd"]), "w")
if config["log-fd"] is not None
else None
)
db = None if config["usage-db"] is None else get_db(config["usage-db"])
usage = create_usage_tracker(
blur_usage=config["blur-usage"],
log_file=log_file,
usage_db=db,
)
transit = transit_server.Transit(usage, reactor.seconds)
tcp_factory = protocol.ServerFactory()
tcp_factory.protocol = transit_server.TransitConnection
tcp_factory.log_requests = False
if ws_ep is not None:
ws_url = config["websocket-url"]
if ws_url is None:
# we're using a "private" attribute here but I don't see
# any useful alternative unless we also want to parse
# Twisted endpoint-strings.
ws_url = "ws://localhost:{}/".format(ws_ep._port)
print("Using WebSocket URL '{}'".format(ws_url))
ws_factory = WebSocketServerFactory(ws_url)
ws_factory.protocol = transit_server.WebSocketTransitConnection
ws_factory.transit = transit
ws_factory.log_requests = False
tcp_factory.transit = transit
parent = MultiService() parent = MultiService()
StreamServerEndpointService(tcp_ep, tcp_factory).setServiceParent(parent) StreamServerEndpointService(ep, f).setServiceParent(parent)
if ws_ep is not None: TimerService(5*60.0, f.timerUpdateStats).setServiceParent(parent)
StreamServerEndpointService(ws_ep, ws_factory).setServiceParent(parent)
TimerService(5*60.0, transit.update_stats).setServiceParent(parent)
return parent return parent

View File

@ -1,144 +1,30 @@
from twisted.internet.protocol import ( #from __future__ import unicode_literals
ClientFactory, from twisted.internet import reactor, endpoints
Protocol, from twisted.internet.defer import inlineCallbacks
) from ..transit_server import Transit
from twisted.test import iosim
from zope.interface import (
Interface,
Attribute,
implementer,
)
from ..transit_server import (
Transit,
TransitConnection,
)
from twisted.internet.protocol import ServerFactory
from ..usage import create_usage_tracker
class IRelayTestClient(Interface):
"""
The client interface used by tests.
"""
connected = Attribute("True if we are currently connected else False")
def send(data):
"""
Send some bytes.
:param bytes data: the data to send
"""
def disconnect():
"""
Terminate the connection.
"""
def get_received_data():
"""
:returns: all the bytes received from the server on this
connection.
"""
def reset_data():
"""
Erase any received data to this point.
"""
class ServerBase: class ServerBase:
log_requests = False log_requests = False
@inlineCallbacks
def setUp(self): def setUp(self):
self._pumps = []
self._lp = None self._lp = None
if self.log_requests: if self.log_requests:
blur_usage = None blur_usage = None
else: else:
blur_usage = 60.0 blur_usage = 60.0
self._setup_relay(blur_usage=blur_usage) yield self._setup_relay(blur_usage=blur_usage)
self._transit_server._debug_log = self.log_requests
def flush(self):
did_work = False
for pump in self._pumps:
did_work = pump.flush() or did_work
if did_work:
self.flush()
@inlineCallbacks
def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None): def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None):
usage = create_usage_tracker( ep = endpoints.TCP4ServerEndpoint(reactor, 0, interface="127.0.0.1")
blur_usage=blur_usage, self._transit_server = Transit(blur_usage=blur_usage,
log_file=log_file, log_file=log_file, usage_db=usage_db)
usage_db=usage_db, self._lp = yield ep.listen(self._transit_server)
) addr = self._lp.getHost()
self._transit_server = Transit(usage, lambda: 123456789.0) # ws://127.0.0.1:%d/wormhole-relay/ws
self.transit = u"tcp:127.0.0.1:%d" % addr.port
def new_protocol(self):
"""
This should be overridden by derived test-case classes to decide
if they want a TCP or WebSockets protocol.
"""
raise NotImplementedError()
def new_protocol_tcp(self):
"""
Create a new client protocol connected to the server.
:returns: a IRelayTestClient implementation
"""
server_factory = ServerFactory()
server_factory.protocol = TransitConnection
server_factory.transit = self._transit_server
server_factory.log_requests = self.log_requests
server_protocol = server_factory.buildProtocol(('127.0.0.1', 0))
@implementer(IRelayTestClient)
class TransitClientProtocolTcp(Protocol):
"""
Speak the transit client protocol used by the tests over TCP
"""
_received = b""
connected = False
# override Protocol callbacks
def connectionMade(self):
self.connected = True
return Protocol.connectionMade(self)
def connectionLost(self, reason):
self.connected = False
return Protocol.connectionLost(self, reason)
def dataReceived(self, data):
self._received = self._received + data
# IRelayTestClient
def send(self, data):
self.transport.write(data)
def disconnect(self):
self.transport.loseConnection()
def reset_received_data(self):
self._received = b""
def get_received_data(self):
return self._received
client_factory = ClientFactory()
client_factory.protocol = TransitClientProtocolTcp
client_protocol = client_factory.buildProtocol(('127.0.0.1', 31337))
pump = iosim.connect(
server_protocol,
iosim.makeFakeServer(server_protocol),
client_protocol,
iosim.makeFakeClient(client_protocol),
)
pump.flush()
self._pumps.append(pump)
return client_protocol
def tearDown(self): def tearDown(self):
if self._lp: if self._lp:

View File

@ -1,208 +0,0 @@
from io import (
StringIO,
)
import sys
import shutil
from twisted.trial import unittest
from twisted.internet.interfaces import (
IPullProducer,
)
from twisted.internet.protocol import (
ProcessProtocol,
)
from twisted.internet.defer import (
inlineCallbacks,
Deferred,
)
from autobahn.twisted.websocket import (
WebSocketClientProtocol,
create_client_agent,
)
from zope.interface import implementer
class _CollectOutputProtocol(ProcessProtocol):
"""
Internal helper. Collects all output (stdout + stderr) into
self.output, and callback's on done with all of it after the
process exits (for any reason).
"""
def __init__(self):
self.done = Deferred()
self.running = Deferred()
self.output = StringIO()
def processEnded(self, reason):
if not self.done.called:
self.done.callback(self.output.getvalue())
def outReceived(self, data):
print(data.decode(), end="", flush=True)
self.output.write(data.decode(sys.getfilesystemencoding()))
if not self.running.called:
if "on 8088" in self.output.getvalue():
self.running.callback(None)
def errReceived(self, data):
print("ERR: {}".format(data.decode(sys.getfilesystemencoding())))
self.output.write(data.decode(sys.getfilesystemencoding()))
def run_transit(reactor, proto, tcp_port=None, websocket_port=None):
exe = shutil.which("twistd")
args = [
exe, "-n", "transitrelay",
]
if tcp_port is not None:
args.append("--port")
args.append(tcp_port)
if websocket_port is not None:
args.append("--websocket")
args.append(websocket_port)
proc = reactor.spawnProcess(proto, exe, args)
return proc
class Sender(WebSocketClientProtocol):
"""
"""
def __init__(self, *args, **kw):
WebSocketClientProtocol.__init__(self, *args, **kw)
self.done = Deferred()
self.got_ok = Deferred()
def onMessage(self, payload, is_binary):
print("onMessage")
if not self.got_ok.called:
if payload == b"ok\n":
self.got_ok.callback(None)
print("send: {}".format(payload.decode("utf8")))
def onClose(self, clean, code, reason):
print(f"close: {clean} {code} {reason}")
self.done.callback(None)
class Receiver(WebSocketClientProtocol):
"""
"""
def __init__(self, *args, **kw):
WebSocketClientProtocol.__init__(self, *args, **kw)
self.done = Deferred()
self.first_message = Deferred()
self.received = 0
def onMessage(self, payload, is_binary):
print("recv: {}".format(len(payload)))
self.received += len(payload)
if not self.first_message.called:
self.first_message.callback(None)
def onClose(self, clean, code, reason):
print(f"close: {clean} {code} {reason}")
self.done.callback(None)
class TransitWebSockets(unittest.TestCase):
"""
Integration-style tests of the transit WebSocket relay, using the
real reactor (and running transit as a subprocess).
"""
@inlineCallbacks
def test_buffer_fills(self):
"""
A running transit relay stops accepting incoming data at a
reasonable amount if the peer isn't reading. This test defines
that as 'less than 100MiB' although in practice Twisted seems
to stop before 10MiB.
"""
from twisted.internet import reactor
transit_proto = _CollectOutputProtocol()
transit_proc = run_transit(reactor, transit_proto, websocket_port="tcp:8088")
def cleanup_process():
transit_proc.signalProcess("HUP")
return transit_proto.done
self.addCleanup(cleanup_process)
yield transit_proto.running
print("Transit running")
agent = create_client_agent(reactor)
side_a = yield agent.open("ws://localhost:8088", {}, lambda: Sender())
side_b = yield agent.open("ws://localhost:8088", {}, lambda: Receiver())
side_a.sendMessage(b"please relay aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa for side aaaaaaaaaaaaaaaa", True)
side_b.sendMessage(b"please relay aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa for side bbbbbbbbbbbbbbbb", True)
yield side_a.got_ok
yield side_b.first_message
# remove side_b's filedescriptor from the reactor .. this
# means it will not read any more data
reactor.removeReader(side_b.transport)
# attempt to send up to 100MiB through side_a .. we should get
# backpressure before that works which only manifests itself
# as this producer not being asked to produce more
max_data = 1024*1024*100 # 100MiB
@implementer(IPullProducer)
class ProduceMessages:
def __init__(self, ws, on_produce):
self._ws = ws
self._sent = 0
self._max = max_data
self._on_produce = on_produce
def resumeProducing(self):
self._on_produce()
if self._sent >= self._max:
self._ws.sendClose()
return
data = b"a" * 1024*1024
self._ws.sendMessage(data, True)
self._sent += len(data)
print("sent {}, total {}".format(len(data), self._sent))
# our only signal is, "did our producer get asked to produce
# more data" which it should do periodically. We want to stop
# if we haven't seen a new data request for a while -- defined
# as "more than 5 seconds".
done = Deferred()
last_produce = None
timeout = 2 # seconds
def asked_for_data():
nonlocal last_produce
last_produce = reactor.seconds()
data = ProduceMessages(side_a, asked_for_data)
side_a.transport.registerProducer(data, False)
data.resumeProducing()
def check_if_done():
if last_produce is not None:
if reactor.seconds() - last_produce > timeout:
done.callback(None)
return
# recursive call to ourselves to check again soon
reactor.callLater(.1, check_if_done)
check_if_done()
yield done
mib = 1024*1024.0
print("Sent {}MiB of {}MiB before backpressure".format(data._sent / mib, max_data / mib))
self.assertTrue(data._sent < max_data, "Too much data sent")
side_a.sendClose()
side_b.sendClose()
yield side_a.done
yield side_b.done

View File

@ -1,36 +1,18 @@
from __future__ import unicode_literals, print_function
from twisted.trial import unittest from twisted.trial import unittest
from .. import server_tap from .. import server_tap
PORT = "tcp:4001:interface=\:\:"
class Config(unittest.TestCase): class Config(unittest.TestCase):
def test_defaults(self): def test_defaults(self):
o = server_tap.Options() o = server_tap.Options()
o.parseOptions([]) o.parseOptions([])
self.assertEqual(o, {"blur-usage": None, "log-fd": None, self.assertEqual(o, {"blur-usage": None, "log-fd": None,
"usage-db": None, "port": PORT, "usage-db": None, "port": "tcp:4001"})
"websocket": None, "websocket-url": None})
def test_blur(self): def test_blur(self):
o = server_tap.Options() o = server_tap.Options()
o.parseOptions(["--blur-usage=60"]) o.parseOptions(["--blur-usage=60"])
self.assertEqual(o, {"blur-usage": 60, "log-fd": None, self.assertEqual(o, {"blur-usage": 60, "log-fd": None,
"usage-db": None, "port": PORT, "usage-db": None, "port": "tcp:4001"})
"websocket": None, "websocket-url": None})
def test_websocket(self):
o = server_tap.Options()
o.parseOptions(["--websocket=tcp:4004"])
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
"usage-db": None, "port": PORT,
"websocket": "tcp:4004", "websocket-url": None})
def test_websocket_url(self):
o = server_tap.Options()
o.parseOptions(["--websocket=tcp:4004", "--websocket-url=ws://example.com/"])
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
"usage-db": None, "port": PORT,
"websocket": "tcp:4004",
"websocket-url": "ws://example.com/"})
def test_string(self): def test_string(self):
o = server_tap.Options() o = server_tap.Options()

View File

@ -1,3 +1,4 @@
from __future__ import print_function, unicode_literals
import os import os
from twisted.python import filepath from twisted.python import filepath
from twisted.trial import unittest from twisted.trial import unittest

View File

@ -1,56 +0,0 @@
from unittest import mock
from twisted.trial import unittest
from ..increase_rlimits import increase_rlimits
class RLimits(unittest.TestCase):
def test_rlimit(self):
def patch_r(name, *args, **kwargs):
return mock.patch("wormhole_transit_relay.increase_rlimits." + name, *args, **kwargs)
fakelog = []
def checklog(*expected):
self.assertEqual(fakelog, list(expected))
fakelog[:] = []
NF = "NOFILE"
mock_NF = patch_r("RLIMIT_NOFILE", NF)
with patch_r("log.msg", fakelog.append):
with patch_r("getrlimit", None):
increase_rlimits()
checklog("unable to import 'resource', leaving rlimit alone")
with mock_NF:
with patch_r("getrlimit", return_value=(20000, 30000)) as gr:
increase_rlimits()
self.assertEqual(gr.mock_calls, [mock.call(NF)])
checklog("RLIMIT_NOFILE.soft was 20000, leaving it alone")
with patch_r("getrlimit", return_value=(10, 30000)) as gr:
with patch_r("setrlimit", side_effect=TypeError("other")):
with patch_r("log.err") as err:
increase_rlimits()
self.assertEqual(err.mock_calls, [mock.call()])
checklog("changing RLIMIT_NOFILE from (10,30000) to (30000,30000)",
"other error during setrlimit, leaving it alone")
for maxlimit in [40000, 20000, 9000, 2000, 1000]:
def setrlimit(which, newlimit):
if newlimit[0] > maxlimit:
raise ValueError("nope")
return None
calls = []
expected = []
for tries in [30000, 10000, 3200, 1024]:
calls.append(mock.call(NF, (tries, 30000)))
expected.append("changing RLIMIT_NOFILE from (10,30000) to (%d,30000)" % tries)
if tries > maxlimit:
expected.append("error during setrlimit: nope")
else:
expected.append("setrlimit successful")
break
else:
expected.append("unable to change rlimit, leaving it alone")
with patch_r("setrlimit", side_effect=setrlimit) as sr:
increase_rlimits()
self.assertEqual(sr.mock_calls, calls)
checklog(*expected)

View File

@ -1,14 +1,14 @@
from __future__ import unicode_literals, print_function
from twisted.trial import unittest from twisted.trial import unittest
from unittest import mock import mock
from twisted.application.service import MultiService from twisted.application.service import MultiService
from autobahn.twisted.websocket import WebSocketServerFactory
from .. import server_tap from .. import server_tap
class Service(unittest.TestCase): class Service(unittest.TestCase):
def test_defaults(self): def test_defaults(self):
o = server_tap.Options() o = server_tap.Options()
o.parseOptions([]) o.parseOptions([])
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
s = server_tap.makeService(o) s = server_tap.makeService(o)
self.assertEqual(t.mock_calls, self.assertEqual(t.mock_calls,
[mock.call(blur_usage=None, [mock.call(blur_usage=None,
@ -18,7 +18,7 @@ class Service(unittest.TestCase):
def test_blur(self): def test_blur(self):
o = server_tap.Options() o = server_tap.Options()
o.parseOptions(["--blur-usage=60"]) o.parseOptions(["--blur-usage=60"])
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
server_tap.makeService(o) server_tap.makeService(o)
self.assertEqual(t.mock_calls, self.assertEqual(t.mock_calls,
[mock.call(blur_usage=60, [mock.call(blur_usage=60,
@ -28,7 +28,7 @@ class Service(unittest.TestCase):
o = server_tap.Options() o = server_tap.Options()
o.parseOptions(["--log-fd=99"]) o.parseOptions(["--log-fd=99"])
fd = object() fd = object()
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
with mock.patch("wormhole_transit_relay.server_tap.os.fdopen", with mock.patch("wormhole_transit_relay.server_tap.os.fdopen",
return_value=fd) as f: return_value=fd) as f:
server_tap.makeService(o) server_tap.makeService(o)
@ -37,34 +37,3 @@ class Service(unittest.TestCase):
[mock.call(blur_usage=None, [mock.call(blur_usage=None,
log_file=fd, usage_db=None)]) log_file=fd, usage_db=None)])
def test_websocket(self):
"""
A websocket factory is created when passing --websocket
"""
o = server_tap.Options()
o.parseOptions(["--websocket=tcp:4004"])
services = server_tap.makeService(o)
self.assertTrue(
any(
isinstance(s.factory, WebSocketServerFactory)
for s in services.services
)
)
def test_websocket_explicit_url(self):
"""
A websocket factory is created with --websocket and
--websocket-url
"""
o = server_tap.Options()
o.parseOptions([
"--websocket=tcp:4004",
"--websocket-url=ws://example.com:4004",
])
services = server_tap.makeService(o)
self.assertTrue(
any(
isinstance(s.factory, WebSocketServerFactory)
for s in services.services
)
)

View File

@ -1,51 +1,39 @@
import os, io, json from __future__ import print_function, unicode_literals
from unittest import mock import os, io, json, sqlite3
import mock
from twisted.trial import unittest from twisted.trial import unittest
from ..transit_server import Transit from ..transit_server import Transit
from ..usage import create_usage_tracker
from .. import database from .. import database
class DB(unittest.TestCase): class DB(unittest.TestCase):
def open_db(self, dbfile):
db = sqlite3.connect(dbfile)
database._initialize_db_connection(db)
return db
def test_db(self): def test_db(self):
T = 1519075308.0
class Timer:
t = T
def __call__(self):
return self.t
get_time = Timer()
d = self.mktemp() d = self.mktemp()
os.mkdir(d) os.mkdir(d)
usage_db = os.path.join(d, "usage.sqlite") usage_db = os.path.join(d, "usage.sqlite")
db = database.get_db(usage_db) with mock.patch("time.time", return_value=456):
t = Transit( t = Transit(blur_usage=None, log_file=None, usage_db=usage_db)
create_usage_tracker(blur_usage=None, log_file=None, usage_db=db), db = self.open_db(usage_db)
get_time,
)
self.assertEqual(len(t.usage._backends), 1)
usage = list(t.usage._backends)[0]
get_time.t = T + 1
usage.record_usage(started=123, mood="happy", total_bytes=100,
total_time=10, waiting_time=2)
t.update_stats()
with mock.patch("time.time", return_value=457):
t.recordUsage(started=123, result="happy", total_bytes=100,
total_time=10, waiting_time=2)
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
[dict(result="happy", started=123, [dict(result="happy", started=123,
total_bytes=100, total_time=10, waiting_time=2), total_bytes=100, total_time=10, waiting_time=2),
]) ])
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(), self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
dict(rebooted=T+0, updated=T+1, dict(rebooted=456, updated=457,
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
get_time.t = T + 2 with mock.patch("time.time", return_value=458):
usage.record_usage(started=150, mood="errory", total_bytes=200, t.recordUsage(started=150, result="errory", total_bytes=200,
total_time=11, waiting_time=3) total_time=11, waiting_time=3)
t.update_stats()
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
[dict(result="happy", started=123, [dict(result="happy", started=123,
total_bytes=100, total_time=10, waiting_time=2), total_bytes=100, total_time=10, waiting_time=2),
@ -53,41 +41,31 @@ class DB(unittest.TestCase):
total_bytes=200, total_time=11, waiting_time=3), total_bytes=200, total_time=11, waiting_time=3),
]) ])
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(), self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
dict(rebooted=T+0, updated=T+2, dict(rebooted=456, updated=458,
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
get_time.t = T + 3 with mock.patch("time.time", return_value=459):
t.update_stats() t.timerUpdateStats()
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(), self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
dict(rebooted=T+0, updated=T+3, dict(rebooted=456, updated=459,
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
def test_no_db(self): def test_no_db(self):
t = Transit( t = Transit(blur_usage=None, log_file=None, usage_db=None)
create_usage_tracker(blur_usage=None, log_file=None, usage_db=None),
lambda: 0,
)
self.assertEqual(0, len(t.usage._backends))
t.recordUsage(started=123, result="happy", total_bytes=100,
total_time=10, waiting_time=2)
t.timerUpdateStats()
class LogToStdout(unittest.TestCase): class LogToStdout(unittest.TestCase):
def test_log(self): def test_log(self):
# emit lines of JSON to log_file, if set # emit lines of JSON to log_file, if set
log_file = io.StringIO() log_file = io.StringIO()
t = Transit( t = Transit(blur_usage=None, log_file=log_file, usage_db=None)
create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None), t.recordUsage(started=123, result="happy", total_bytes=100,
lambda: 0, total_time=10, waiting_time=2)
)
with mock.patch("time.time", return_value=133):
t.usage.record(
started=123,
buddy_started=125,
result="happy",
bytes_sent=100,
buddy_bytes=0,
)
self.assertEqual(json.loads(log_file.getvalue()), self.assertEqual(json.loads(log_file.getvalue()),
{"started": 123, "total_time": 10, {"started": 123, "total_time": 10,
"waiting_time": 2, "total_bytes": 100, "waiting_time": 2, "total_bytes": 100,
@ -97,34 +75,15 @@ class LogToStdout(unittest.TestCase):
# if blurring is enabled, timestamps should be rounded to the # if blurring is enabled, timestamps should be rounded to the
# requested amount, and sizes should be rounded up too # requested amount, and sizes should be rounded up too
log_file = io.StringIO() log_file = io.StringIO()
t = Transit( t = Transit(blur_usage=60, log_file=log_file, usage_db=None)
create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None), t.recordUsage(started=123, result="happy", total_bytes=11999,
lambda: 0, total_time=10, waiting_time=2)
)
with mock.patch("time.time", return_value=123 + 10):
t.usage.record(
started=123,
buddy_started=125,
result="happy",
bytes_sent=11999,
buddy_bytes=0,
)
print(log_file.getvalue())
self.assertEqual(json.loads(log_file.getvalue()), self.assertEqual(json.loads(log_file.getvalue()),
{"started": 120, "total_time": 10, {"started": 120, "total_time": 10,
"waiting_time": 2, "total_bytes": 20000, "waiting_time": 2, "total_bytes": 20000,
"mood": "happy"}) "mood": "happy"})
def test_do_not_log(self): def test_do_not_log(self):
t = Transit( t = Transit(blur_usage=60, log_file=None, usage_db=None)
create_usage_tracker(blur_usage=60, log_file=None, usage_db=None), t.recordUsage(started=123, result="happy", total_bytes=11999,
lambda: 0, total_time=10, waiting_time=2)
)
t.usage.record(
started=123,
buddy_started=124,
result="happy",
bytes_sent=11999,
buddy_bytes=12,
)

View File

@ -1,273 +1,299 @@
from __future__ import print_function, unicode_literals
from binascii import hexlify from binascii import hexlify
from twisted.trial import unittest from twisted.trial import unittest
from twisted.test import iosim from twisted.internet import protocol, reactor, defer
from autobahn.twisted.websocket import ( from twisted.internet.endpoints import clientFromString, connectProtocol
WebSocketServerFactory, from .common import ServerBase
WebSocketClientFactory, from .. import transit_server
WebSocketClientProtocol,
)
from autobahn.twisted.testing import (
create_pumper,
MemoryReactorClockResolver,
)
from autobahn.exception import Disconnected
from zope.interface import implementer
from .common import (
ServerBase,
IRelayTestClient,
)
from ..usage import (
MemoryUsageRecorder,
blur_size,
)
from ..transit_server import (
WebSocketTransitConnection,
TransitServerState,
)
class Accumulator(protocol.Protocol):
def handshake(token, side=None): def __init__(self):
hs = b"please relay " + hexlify(token) self.data = b""
if side is not None: self.count = 0
hs += b" for side " + hexlify(side) self._wait = None
hs += b"\n" self._disconnect = defer.Deferred()
return hs def waitForBytes(self, more):
assert self._wait is None
self.count = more
self._wait = defer.Deferred()
self._check_done()
return self._wait
def dataReceived(self, data):
self.data = self.data + data
self._check_done()
def _check_done(self):
if self._wait and len(self.data) >= self.count:
d = self._wait
self._wait = None
d.callback(self)
def connectionLost(self, why):
if self._wait:
self._wait.errback(RuntimeError("closed"))
self._disconnect.callback(None)
class _Transit: class _Transit:
def count(self):
return sum([
len(potentials)
for potentials
in self._transit_server.pending_requests._requests.values()
])
def test_blur_size(self): def test_blur_size(self):
self.failUnlessEqual(blur_size(0), 0) blur = transit_server.blur_size
self.failUnlessEqual(blur_size(1), 10e3) self.failUnlessEqual(blur(0), 0)
self.failUnlessEqual(blur_size(10e3), 10e3) self.failUnlessEqual(blur(1), 10e3)
self.failUnlessEqual(blur_size(10e3+1), 20e3) self.failUnlessEqual(blur(10e3), 10e3)
self.failUnlessEqual(blur_size(15e3), 20e3) self.failUnlessEqual(blur(10e3+1), 20e3)
self.failUnlessEqual(blur_size(20e3), 20e3) self.failUnlessEqual(blur(15e3), 20e3)
self.failUnlessEqual(blur_size(1e6), 1e6) self.failUnlessEqual(blur(20e3), 20e3)
self.failUnlessEqual(blur_size(1e6+1), 2e6) self.failUnlessEqual(blur(1e6), 1e6)
self.failUnlessEqual(blur_size(1.5e6), 2e6) self.failUnlessEqual(blur(1e6+1), 2e6)
self.failUnlessEqual(blur_size(2e6), 2e6) self.failUnlessEqual(blur(1.5e6), 2e6)
self.failUnlessEqual(blur_size(900e6), 900e6) self.failUnlessEqual(blur(2e6), 2e6)
self.failUnlessEqual(blur_size(1000e6), 1000e6) self.failUnlessEqual(blur(900e6), 900e6)
self.failUnlessEqual(blur_size(1050e6), 1100e6) self.failUnlessEqual(blur(1000e6), 1000e6)
self.failUnlessEqual(blur_size(1100e6), 1100e6) self.failUnlessEqual(blur(1050e6), 1100e6)
self.failUnlessEqual(blur_size(1150e6), 1200e6) self.failUnlessEqual(blur(1100e6), 1100e6)
self.failUnlessEqual(blur(1150e6), 1200e6)
@defer.inlineCallbacks
def test_register(self): def test_register(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
a1.transport.write(b"please relay " + hexlify(token1) +
b" for side " + hexlify(side1) + b"\n")
p1.send(handshake(token1, side1)) # let that arrive
self.flush() while self.count() == 0:
yield self.wait()
self.assertEqual(self.count(), 1) self.assertEqual(self.count(), 1)
p1.disconnect() a1.transport.loseConnection()
self.flush()
# let that get removed
while self.count() > 0:
yield self.wait()
self.assertEqual(self.count(), 0) self.assertEqual(self.count(), 0)
# the token should be removed too # the token should be removed too
self.assertEqual(len(self._transit_server.pending_requests._requests), 0) self.assertEqual(len(self._transit_server._pending_requests), 0)
@defer.inlineCallbacks
def test_both_unsided(self): def test_both_unsided(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
p2 = self.new_protocol() a1 = yield connectProtocol(ep, Accumulator())
a2 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
p1.send(handshake(token1, side=None)) a1.transport.write(b"please relay " + hexlify(token1) + b"\n")
self.flush() a2.transport.write(b"please relay " + hexlify(token1) + b"\n")
p2.send(handshake(token1, side=None))
self.flush()
# a correct handshake yields an ack, after which we can send # a correct handshake yields an ack, after which we can send
exp = b"ok\n" exp = b"ok\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(p2.get_received_data(), exp) self.assertEqual(a1.data, exp)
p1.reset_received_data()
p2.reset_received_data()
s1 = b"data1" s1 = b"data1"
p1.send(s1) a1.transport.write(s1)
self.flush()
self.assertEqual(p2.get_received_data(), s1)
p1.disconnect() exp = b"ok\n"
self.flush() yield a2.waitForBytes(len(exp))
self.assertEqual(a2.data, exp)
# all data they sent after the handshake should be given to us
exp = b"ok\n"+s1
yield a2.waitForBytes(len(exp))
self.assertEqual(a2.data, exp)
a1.transport.loseConnection()
a2.transport.loseConnection()
@defer.inlineCallbacks
def test_sided_unsided(self): def test_sided_unsided(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
p2 = self.new_protocol() a1 = yield connectProtocol(ep, Accumulator())
a2 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
p1.send(handshake(token1, side=side1)) a1.transport.write(b"please relay " + hexlify(token1) +
self.flush() b" for side " + hexlify(side1) + b"\n")
p2.send(handshake(token1, side=None)) a2.transport.write(b"please relay " + hexlify(token1) + b"\n")
self.flush()
# a correct handshake yields an ack, after which we can send # a correct handshake yields an ack, after which we can send
exp = b"ok\n" exp = b"ok\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(p2.get_received_data(), exp) self.assertEqual(a1.data, exp)
s1 = b"data1"
a1.transport.write(s1)
p1.reset_received_data() exp = b"ok\n"
p2.reset_received_data() yield a2.waitForBytes(len(exp))
self.assertEqual(a2.data, exp)
# all data they sent after the handshake should be given to us # all data they sent after the handshake should be given to us
s1 = b"data1" exp = b"ok\n"+s1
p1.send(s1) yield a2.waitForBytes(len(exp))
self.flush() self.assertEqual(a2.data, exp)
self.assertEqual(p2.get_received_data(), s1)
p1.disconnect() a1.transport.loseConnection()
self.flush() a2.transport.loseConnection()
@defer.inlineCallbacks
def test_unsided_sided(self): def test_unsided_sided(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
p2 = self.new_protocol() a1 = yield connectProtocol(ep, Accumulator())
a2 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
p1.send(handshake(token1, side=None)) a1.transport.write(b"please relay " + hexlify(token1) + b"\n")
p2.send(handshake(token1, side=side1)) a2.transport.write(b"please relay " + hexlify(token1) +
self.flush() b" for side " + hexlify(side1) + b"\n")
# a correct handshake yields an ack, after which we can send # a correct handshake yields an ack, after which we can send
exp = b"ok\n" exp = b"ok\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(p2.get_received_data(), exp) self.assertEqual(a1.data, exp)
s1 = b"data1"
a1.transport.write(s1)
p1.reset_received_data() exp = b"ok\n"
p2.reset_received_data() yield a2.waitForBytes(len(exp))
self.assertEqual(a2.data, exp)
# all data they sent after the handshake should be given to us # all data they sent after the handshake should be given to us
s1 = b"data1" exp = b"ok\n"+s1
p1.send(s1) yield a2.waitForBytes(len(exp))
self.flush() self.assertEqual(a2.data, exp)
self.assertEqual(p2.get_received_data(), s1)
p1.disconnect() a1.transport.loseConnection()
p2.disconnect() a2.transport.loseConnection()
@defer.inlineCallbacks
def test_both_sided(self): def test_both_sided(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
p2 = self.new_protocol() a1 = yield connectProtocol(ep, Accumulator())
a2 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
side2 = b"\x02"*8 side2 = b"\x02"*8
p1.send(handshake(token1, side=side1)) a1.transport.write(b"please relay " + hexlify(token1) +
self.flush() b" for side " + hexlify(side1) + b"\n")
p2.send(handshake(token1, side=side2)) a2.transport.write(b"please relay " + hexlify(token1) +
self.flush() b" for side " + hexlify(side2) + b"\n")
# a correct handshake yields an ack, after which we can send # a correct handshake yields an ack, after which we can send
exp = b"ok\n" exp = b"ok\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(p2.get_received_data(), exp) self.assertEqual(a1.data, exp)
s1 = b"data1"
a1.transport.write(s1)
p1.reset_received_data() exp = b"ok\n"
p2.reset_received_data() yield a2.waitForBytes(len(exp))
self.assertEqual(a2.data, exp)
# all data they sent after the handshake should be given to us # all data they sent after the handshake should be given to us
s1 = b"data1" exp = b"ok\n"+s1
p1.send(s1) yield a2.waitForBytes(len(exp))
self.flush() self.assertEqual(a2.data, exp)
self.assertEqual(p2.get_received_data(), s1)
p1.disconnect() a1.transport.loseConnection()
p2.disconnect() a2.transport.loseConnection()
def count(self):
return sum([len(potentials)
for potentials
in self._transit_server._pending_requests.values()])
def wait(self):
d = defer.Deferred()
reactor.callLater(0.001, d.callback, None)
return d
@defer.inlineCallbacks
def test_ignore_same_side(self): def test_ignore_same_side(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
p2 = self.new_protocol() a1 = yield connectProtocol(ep, Accumulator())
p3 = self.new_protocol() a2 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
a1.transport.write(b"please relay " + hexlify(token1) +
p1.send(handshake(token1, side=side1)) b" for side " + hexlify(side1) + b"\n")
self.flush() # let that arrive
self.assertEqual(self.count(), 1) while self.count() == 0:
yield self.wait()
p2.send(handshake(token1, side=side1)) a2.transport.write(b"please relay " + hexlify(token1) +
self.flush() b" for side " + hexlify(side1) + b"\n")
self.flush() # let that arrive
while self.count() == 1:
yield self.wait()
self.assertEqual(self.count(), 2) # same-side connections don't match self.assertEqual(self.count(), 2) # same-side connections don't match
# when the second side arrives, the spare first connection should be a1.transport.loseConnection()
# closed a2.transport.loseConnection()
side2 = b"\x02"*8
p3.send(handshake(token1, side=side2))
self.flush()
self.assertEqual(self.count(), 0)
self.assertEqual(len(self._transit_server.pending_requests._requests), 0)
self.assertEqual(len(self._transit_server.active_connections._connections), 2)
# That will trigger a disconnect on exactly one of (p1 or p2).
# The other connection should still be connected
self.assertEqual(sum([int(t.connected) for t in [p1, p2]]), 1)
p1.disconnect()
p2.disconnect()
p3.disconnect()
@defer.inlineCallbacks
def test_bad_handshake_old(self): def test_bad_handshake_old(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
p1.send(b"please DELAY " + hexlify(token1) + b"\n") # the server waits for the exact number of bytes in the expected
self.flush() # handshake message. to trigger "bad handshake", we must match.
a1.transport.write(b"please DELAY " + hexlify(token1) + b"\n")
exp = b"bad handshake\n" exp = b"bad handshake\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
p1.disconnect() self.assertEqual(a1.data, exp)
a1.transport.loseConnection()
@defer.inlineCallbacks
def test_bad_handshake_old_slow(self): def test_bad_handshake_old_slow(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
p1.send(b"please DELAY ") a1.transport.write(b"please DELAY ")
self.flush()
# As in test_impatience_new_slow, the current state machine has code # As in test_impatience_new_slow, the current state machine has code
# that can only be reached if we insert a stall here, so dataReceived # that can only be reached if we insert a stall here, so dataReceived
# gets called twice. Hopefully we can delete this test once # gets called twice. Hopefully we can delete this test once
# dataReceived is refactored to remove that state. # dataReceived is refactored to remove that state.
d = defer.Deferred()
reactor.callLater(0.1, d.callback, None)
yield d
token1 = b"\x00"*32 token1 = b"\x00"*32
# the server waits for the exact number of bytes in the expected # the server waits for the exact number of bytes in the expected
# handshake message. to trigger "bad handshake", we must match. # handshake message. to trigger "bad handshake", we must match.
p1.send(hexlify(token1) + b"\n") a1.transport.write(hexlify(token1) + b"\n")
self.flush()
exp = b"bad handshake\n" exp = b"bad handshake\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect() a1.transport.loseConnection()
@defer.inlineCallbacks
def test_bad_handshake_new(self): def test_bad_handshake_new(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
# the server waits for the exact number of bytes in the expected # the server waits for the exact number of bytes in the expected
# handshake message. to trigger "bad handshake", we must match. # handshake message. to trigger "bad handshake", we must match.
p1.send(b"please DELAY " + hexlify(token1) + a1.transport.write(b"please DELAY " + hexlify(token1) +
b" for side " + hexlify(side1) + b"\n") b" for side " + hexlify(side1) + b"\n")
self.flush()
exp = b"bad handshake\n" exp = b"bad handshake\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect() a1.transport.loseConnection()
@defer.inlineCallbacks
def test_binary_handshake(self): def test_binary_handshake(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
binary_bad_handshake = b"\x00\x01\xe0\x0f\n\xff" binary_bad_handshake = b"\x00\x01\xe0\x0f\n\xff"
# the embedded \n makes the server trigger early, before the full # the embedded \n makes the server trigger early, before the full
@ -276,46 +302,50 @@ class _Transit:
# UnicodeDecodeError when it tried to coerce the incoming handshake # UnicodeDecodeError when it tried to coerce the incoming handshake
# to unicode, due to the ("\n" in buf) check. This was fixed to use # to unicode, due to the ("\n" in buf) check. This was fixed to use
# (b"\n" in buf). This exercises the old failure. # (b"\n" in buf). This exercises the old failure.
p1.send(binary_bad_handshake) a1.transport.write(binary_bad_handshake)
self.flush()
exp = b"bad handshake\n" exp = b"bad handshake\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect() a1.transport.loseConnection()
@defer.inlineCallbacks
def test_impatience_old(self): def test_impatience_old(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
# sending too many bytes is impatience. # sending too many bytes is impatience.
p1.send(b"please relay " + hexlify(token1)) a1.transport.write(b"please relay " + hexlify(token1) + b"\nNOWNOWNOW")
p1.send(b"\nNOWNOWNOW")
self.flush()
exp = b"impatient\n" exp = b"impatient\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect() a1.transport.loseConnection()
@defer.inlineCallbacks
def test_impatience_new(self): def test_impatience_new(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
# sending too many bytes is impatience. # sending too many bytes is impatience.
p1.send(b"please relay " + hexlify(token1) + a1.transport.write(b"please relay " + hexlify(token1) +
b" for side " + hexlify(side1)) b" for side " + hexlify(side1) + b"\nNOWNOWNOW")
p1.send(b"\nNOWNOWNOW")
self.flush()
exp = b"impatient\n" exp = b"impatient\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect() a1.transport.loseConnection()
@defer.inlineCallbacks
def test_impatience_new_slow(self): def test_impatience_new_slow(self):
p1 = self.new_protocol() ep = clientFromString(reactor, self.transit)
a1 = yield connectProtocol(ep, Accumulator())
# For full coverage, we need dataReceived to see a particular framing # For full coverage, we need dataReceived to see a particular framing
# of these two pieces of data, and ITCPTransport doesn't have flush() # of these two pieces of data, and ITCPTransport doesn't have flush()
# (which probably wouldn't work anyways). For now, force a 100ms # (which probably wouldn't work anyways). For now, force a 100ms
@ -328,360 +358,23 @@ class _Transit:
token1 = b"\x00"*32 token1 = b"\x00"*32
side1 = b"\x01"*8 side1 = b"\x01"*8
# sending too many bytes is impatience. # sending too many bytes is impatience.
p1.send(b"please relay " + hexlify(token1) + a1.transport.write(b"please relay " + hexlify(token1) +
b" for side " + hexlify(side1) + b"\n") b" for side " + hexlify(side1) + b"\n")
self.flush()
p1.send(b"NOWNOWNOW") d = defer.Deferred()
self.flush() reactor.callLater(0.1, d.callback, None)
yield d
a1.transport.write(b"NOWNOWNOW")
exp = b"impatient\n" exp = b"impatient\n"
self.assertEqual(p1.get_received_data(), exp) yield a1.waitForBytes(len(exp))
self.assertEqual(a1.data, exp)
p1.disconnect()
def test_short_handshake(self):
p1 = self.new_protocol()
# hang up before sending a complete handshake
p1.send(b"short")
self.flush()
p1.disconnect()
def test_empty_handshake(self):
p1 = self.new_protocol()
# hang up before sending anything
p1.disconnect()
a1.transport.loseConnection()
class TransitWithLogs(_Transit, ServerBase, unittest.TestCase): class TransitWithLogs(_Transit, ServerBase, unittest.TestCase):
log_requests = True log_requests = True
def new_protocol(self):
return self.new_protocol_tcp()
class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase): class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase):
log_requests = False log_requests = False
def new_protocol(self):
return self.new_protocol_tcp()
def _new_protocol_ws(transit_server, log_requests):
"""
Internal helper for test-suites that need to provide WebSocket
client/server pairs.
:returns: a 2-tuple: (iosim.IOPump, protocol)
"""
ws_factory = WebSocketServerFactory("ws://localhost:4002")
ws_factory.protocol = WebSocketTransitConnection
ws_factory.transit = transit_server
ws_factory.log_requests = log_requests
ws_protocol = ws_factory.buildProtocol(('127.0.0.1', 0))
@implementer(IRelayTestClient)
class TransitWebSocketClientProtocol(WebSocketClientProtocol):
_received = b""
connected = False
def connectionMade(self):
self.connected = True
return super(TransitWebSocketClientProtocol, self).connectionMade()
def connectionLost(self, reason):
self.connected = False
return super(TransitWebSocketClientProtocol, self).connectionLost(reason)
def onMessage(self, data, isBinary):
self._received = self._received + data
def send(self, data):
self.sendMessage(data, True)
def get_received_data(self):
return self._received
def reset_received_data(self):
self._received = b""
def disconnect(self):
self.sendClose(1000, True)
client_factory = WebSocketClientFactory()
client_factory.protocol = TransitWebSocketClientProtocol
client_protocol = client_factory.buildProtocol(('127.0.0.1', 31337))
client_protocol.disconnect = client_protocol.dropConnection
pump = iosim.connect(
ws_protocol,
iosim.makeFakeServer(ws_protocol),
client_protocol,
iosim.makeFakeClient(client_protocol),
)
return pump, client_protocol
class TransitWebSockets(_Transit, ServerBase, unittest.TestCase):
def new_protocol(self):
return self.new_protocol_ws()
def new_protocol_ws(self):
pump, proto = _new_protocol_ws(self._transit_server, self.log_requests)
self._pumps.append(pump)
return proto
def test_websocket_to_tcp(self):
"""
One client is WebSocket and one is TCP
"""
p1 = self.new_protocol_ws()
p2 = self.new_protocol_tcp()
token1 = b"\x00"*32
side1 = b"\x01"*8
side2 = b"\x02"*8
p1.send(handshake(token1, side=side1))
self.flush()
p2.send(handshake(token1, side=side2))
self.flush()
# a correct handshake yields an ack, after which we can send
exp = b"ok\n"
self.assertEqual(p1.get_received_data(), exp)
self.assertEqual(p2.get_received_data(), exp)
p1.reset_received_data()
p2.reset_received_data()
# all data they sent after the handshake should be given to us
s1 = b"data1"
p1.send(s1)
self.flush()
self.assertEqual(p2.get_received_data(), s1)
p1.disconnect()
p2.disconnect()
self.flush()
def test_bad_handshake_old_slow(self):
"""
This test only makes sense for TCP
"""
def test_send_closed_partner(self):
"""
Sending data to a closed partner causes an error that propogates
to the sender.
"""
p1 = self.new_protocol()
p2 = self.new_protocol()
# set up a successful connection
token = b"a" * 32
p1.send(handshake(token))
p2.send(handshake(token))
self.flush()
# p2 loses connection, then p1 sends a message
p2.transport.loseConnection()
self.flush()
# at this point, p1 learns that p2 is disconnected (because it
# tried to relay "a message" but failed)
# try to send more (our partner p2 is gone now though so it
# should be an immediate error)
with self.assertRaises(Disconnected):
p1.send(b"more message")
self.flush()
class Usage(ServerBase, unittest.TestCase):
log_requests = True
def setUp(self):
super(Usage, self).setUp()
self._usage = MemoryUsageRecorder()
self._transit_server.usage.add_backend(self._usage)
def new_protocol(self):
return self.new_protocol_tcp()
def test_empty(self):
p1 = self.new_protocol()
# hang up before sending anything
p1.disconnect()
self.flush()
# that will log the "empty" usage event
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual(self._usage.events[0]["mood"], "empty", self._usage)
def test_short(self):
# Note: this test only runs on TCP clients because WebSockets
# already does framing (so it's either "a bad handshake" or
# there's no handshake at all yet .. you can't have a "short"
# one).
p1 = self.new_protocol()
# hang up before sending a complete handshake
p1.send(b"short")
p1.disconnect()
self.flush()
# that will log the "empty" usage event
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual("empty", self._usage.events[0]["mood"])
def test_errory(self):
p1 = self.new_protocol()
p1.send(b"this is a very bad handshake\n")
self.flush()
# that will log the "errory" usage event, then drop the connection
p1.disconnect()
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual(self._usage.events[0]["mood"], "errory", self._usage)
def test_lonely(self):
p1 = self.new_protocol()
token1 = b"\x00"*32
side1 = b"\x01"*8
p1.send(handshake(token1, side=side1))
self.flush()
# now we disconnect before the peer connects
p1.disconnect()
self.flush()
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual(self._usage.events[0]["mood"], "lonely", self._usage)
self.assertIdentical(self._usage.events[0]["waiting_time"], None)
def test_one_happy_one_jilted(self):
p1 = self.new_protocol()
p2 = self.new_protocol()
token1 = b"\x00"*32
side1 = b"\x01"*8
side2 = b"\x02"*8
p1.send(handshake(token1, side=side1))
self.flush()
p2.send(handshake(token1, side=side2))
self.flush()
self.assertEqual(self._usage.events, []) # no events yet
p1.send(b"\x00" * 13)
self.flush()
p2.send(b"\xff" * 7)
self.flush()
p1.disconnect()
self.flush()
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual(self._usage.events[0]["mood"], "happy", self._usage)
self.assertEqual(self._usage.events[0]["total_bytes"], 20)
self.assertNotIdentical(self._usage.events[0]["waiting_time"], None)
def test_redundant(self):
p1a = self.new_protocol()
p1b = self.new_protocol()
p1c = self.new_protocol()
p2 = self.new_protocol()
token1 = b"\x00"*32
side1 = b"\x01"*8
side2 = b"\x02"*8
p1a.send(handshake(token1, side=side1))
self.flush()
p1b.send(handshake(token1, side=side1))
self.flush()
# connect and disconnect a third client (for side1) to exercise the
# code that removes a pending connection without removing the entire
# token
p1c.send(handshake(token1, side=side1))
p1c.disconnect()
self.flush()
self.assertEqual(len(self._usage.events), 1, self._usage)
self.assertEqual(self._usage.events[0]["mood"], "lonely")
p2.send(handshake(token1, side=side2))
self.flush()
self.assertEqual(len(self._transit_server.pending_requests._requests), 0)
self.assertEqual(len(self._usage.events), 2, self._usage)
self.assertEqual(self._usage.events[1]["mood"], "redundant")
# one of the these is unecessary, but probably harmless
p1a.disconnect()
p1b.disconnect()
self.flush()
self.assertEqual(len(self._usage.events), 3, self._usage)
self.assertEqual(self._usage.events[2]["mood"], "happy")
class UsageWebSockets(Usage):
"""
All the tests of 'Usage' except with a WebSocket (instead of TCP)
transport.
This overrides ServerBase.new_protocol to achieve this. It might
be nicer to parametrize these tests in a way that doesn't use
inheritance .. but all the support etc classes are set up that way
already.
"""
def setUp(self):
super(UsageWebSockets, self).setUp()
self._pump = create_pumper()
self._reactor = MemoryReactorClockResolver()
return self._pump.start()
def tearDown(self):
return self._pump.stop()
def new_protocol(self):
return self.new_protocol_ws()
def new_protocol_ws(self):
pump, proto = _new_protocol_ws(self._transit_server, self.log_requests)
self._pumps.append(pump)
return proto
def test_short(self):
"""
This test essentially just tests the framing of the line-oriented
TCP protocol; it doesnt' make sense for the WebSockets case
because WS handles frameing: you either sent a 'bad handshake'
because it is semantically invalid or no handshake (yet).
"""
def test_send_non_binary_message(self):
"""
A non-binary WebSocket message is an error
"""
ws_factory = WebSocketServerFactory("ws://localhost:4002")
ws_factory.protocol = WebSocketTransitConnection
ws_protocol = ws_factory.buildProtocol(('127.0.0.1', 0))
with self.assertRaises(ValueError):
ws_protocol.onMessage(u"foo", isBinary=False)
class State(unittest.TestCase):
"""
Tests related to server_state.TransitServerState
"""
def setUp(self):
self.state = TransitServerState(None, None)
def test_empty_token(self):
self.assertEqual(
"-",
self.state.get_token(),
)

View File

@ -1,9 +1,8 @@
import re from __future__ import print_function, unicode_literals
import time import re, time, json
from twisted.python import log from twisted.python import log
from twisted.protocols.basic import LineReceiver from twisted.internet import protocol
from autobahn.twisted.websocket import WebSocketServerProtocol from .database import get_db
SECONDS = 1.0 SECONDS = 1.0
MINUTE = 60*SECONDS MINUTE = 60*SECONDS
@ -11,256 +10,329 @@ HOUR = 60*MINUTE
DAY = 24*HOUR DAY = 24*HOUR
MB = 1000*1000 MB = 1000*1000
def round_to(size, coarseness):
return int(coarseness*(1+int((size-1)/coarseness)))
from wormhole_transit_relay.server_state import ( def blur_size(size):
TransitServerState, if size == 0:
PendingRequests, return 0
ActiveConnections, if size < 1e6:
ITransitClient, return round_to(size, 10e3)
) if size < 1e9:
from zope.interface import implementer return round_to(size, 1e6)
return round_to(size, 100e6)
class TransitConnection(protocol.Protocol):
@implementer(ITransitClient) def __init__(self):
class TransitConnection(LineReceiver): self._got_token = False
delimiter = b'\n' self._got_side = False
# maximum length of a line we will accept before the handshake is complete. self._token_buffer = b""
# This must be >= to the longest possible handshake message. self._sent_ok = False
MAX_LENGTH = 1024
started_time = None
def send(self, data):
"""
ITransitClient API
"""
self.transport.write(data)
def disconnect(self):
"""
ITransitClient API
"""
self.transport.loseConnection()
def connect_partner(self, other):
"""
ITransitClient API
"""
self._buddy = other
self._buddy._client.transport.registerProducer(self.transport, True)
def disconnect_partner(self):
"""
ITransitClient API
"""
assert self._buddy is not None, "internal error: no buddy"
if self.factory.log_requests:
log.msg("buddy_disconnected {}".format(self._buddy.get_token()))
self._buddy._client.disconnect()
self._buddy = None self._buddy = None
self._had_buddy = False
self._total_sent = 0
def describeToken(self):
d = "-"
if self._got_token:
d = self._got_token[:16].decode("ascii")
if self._got_side:
d += "-" + self._got_side.decode("ascii")
else:
d += "-<unsided>"
return d
def connectionMade(self): def connectionMade(self):
# ideally more like self._reactor.seconds() ... but Twisted self._started = time.time()
# doesn't have a good way to get the reactor for a protocol self._log_requests = self.factory._log_requests
# (besides "use the global one")
self.started_time = time.time()
self._state = TransitServerState(
self.factory.transit.pending_requests,
self.factory.transit.usage,
)
self._state.connection_made(self)
self.transport.setTcpKeepAlive(True)
# uncomment to turn on state-machine tracing def dataReceived(self, data):
# def tracer(oldstate, theinput, newstate): if self._sent_ok:
# print("TRACE: {}: {} --{}--> {}".format(id(self), oldstate, theinput, newstate)) # We are an IPushProducer to our buddy's IConsumer, so they'll
# self._state.set_trace_function(tracer) # throttle us (by calling pauseProducing()) when their outbound
# buffer is full (e.g. when their downstream pipe is full). In
# practice, this buffers about 10MB per connection, after which
# point the sender will only transmit data as fast as the
# receiver can handle it.
self._total_sent += len(data)
self._buddy.transport.write(data)
return
if self._got_token: # but not yet sent_ok
self.transport.write(b"impatient\n")
if self._log_requests:
log.msg("transit impatience failure")
return self.disconnect() # impatience yields failure
# else this should be (part of) the token
self._token_buffer += data
buf = self._token_buffer
def lineReceived(self, line):
"""
LineReceiver API
"""
# old: "please relay {64}\n" # old: "please relay {64}\n"
token = None
old = re.search(br"^please relay (\w{64})$", line)
if old:
token = old.group(1)
self._state.please_relay(token)
# new: "please relay {64} for side {16}\n" # new: "please relay {64} for side {16}\n"
new = re.search(br"^please relay (\w{64}) for side (\w{16})$", line) (old, handshake_len, token) = self._check_old_handshake(buf)
if new: assert old in ("yes", "waiting", "no")
token = new.group(1) if old == "yes":
side = new.group(2) # remember they aren't supposed to send anything past their
self._state.please_relay_for_side(token, side) # handshake until we've said go
if len(buf) > handshake_len:
self.transport.write(b"impatient\n")
if self._log_requests:
log.msg("transit impatience failure")
return self.disconnect() # impatience yields failure
return self._got_handshake(token, None)
(new, handshake_len, token, side) = self._check_new_handshake(buf)
assert new in ("yes", "waiting", "no")
if new == "yes":
if len(buf) > handshake_len:
self.transport.write(b"impatient\n")
if self._log_requests:
log.msg("transit impatience failure")
return self.disconnect() # impatience yields failure
return self._got_handshake(token, side)
if (old == "no" and new == "no"):
self.transport.write(b"bad handshake\n")
if self._log_requests:
log.msg("transit handshake failure")
return self.disconnect() # incorrectness yields failure
# else we'll keep waiting
if token is None: def _check_old_handshake(self, buf):
self._state.bad_token() # old: "please relay {64}\n"
else: # return ("yes", handshake, token) if buf contains an old-style handshake
self.setRawMode() # return ("waiting", None, None) if it might eventually contain one
# return ("no", None, None) if it could never contain one
wanted = len("please relay \n")+32*2
if len(buf) < wanted-1 and b"\n" in buf:
return ("no", None, None)
if len(buf) < wanted:
return ("waiting", None, None)
def rawDataReceived(self, data): mo = re.search(br"^please relay (\w{64})\n", buf, re.M)
""" if mo:
LineReceiver API token = mo.group(1)
""" return ("yes", wanted, token)
# We are an IPushProducer to our buddy's IConsumer, so they'll return ("no", None, None)
# throttle us (by calling pauseProducing()) when their outbound
# buffer is full (e.g. when their downstream pipe is full). In def _check_new_handshake(self, buf):
# practice, this buffers about 10MB per connection, after which # new: "please relay {64} for side {16}\n"
# point the sender will only transmit data as fast as the wanted = len("please relay for side \n")+32*2+8*2
# receiver can handle it. if len(buf) < wanted-1 and b"\n" in buf:
self._state.got_bytes(data) return ("no", None, None, None)
if len(buf) < wanted:
return ("waiting", None, None, None)
mo = re.search(br"^please relay (\w{64}) for side (\w{16})\n", buf, re.M)
if mo:
token = mo.group(1)
side = mo.group(2)
return ("yes", wanted, token, side)
return ("no", None, None, None)
def _got_handshake(self, token, side):
self._got_token = token
self._got_side = side
self.factory.connection_got_token(token, side, self)
def buddy_connected(self, them):
self._buddy = them
self._had_buddy = True
self.transport.write(b"ok\n")
self._sent_ok = True
# Connect the two as a producer/consumer pair. We use streaming=True,
# so this expects the IPushProducer interface, and uses
# pauseProducing() to throttle, and resumeProducing() to unthrottle.
self._buddy.transport.registerProducer(self.transport, True)
# The Transit object calls buddy_connected() on both protocols, so
# there will be two producer/consumer pairs.
def buddy_disconnected(self):
if self._log_requests:
log.msg("buddy_disconnected %s" % self.describeToken())
self._buddy = None
self.transport.loseConnection()
def connectionLost(self, reason): def connectionLost(self, reason):
self._state.connection_lost() if self._buddy:
self._buddy.buddy_disconnected()
self.factory.transitFinished(self, self._got_token, self._got_side,
self.describeToken())
# Record usage. There are four cases:
# * 1: we connected, never had a buddy
# * 2: we connected first, we disconnect before the buddy
# * 3: we connected first, buddy disconnects first
# * 4: buddy connected first, we disconnect before buddy
# * 5: buddy connected first, buddy disconnects first
class Transit(object): # whoever disconnects first gets to write the usage record (1,2,4)
"""
I manage pairs of simultaneous connections to a secondary TCP port,
both forwarded to the other. Clients must begin each connection with
"please relay TOKEN for SIDE\n" (or a legacy form without the "for
SIDE"). Two connections match if they use the same TOKEN and have
different SIDEs (the redundant connections are dropped when a match is
made). Legacy connections match any with the same TOKEN, ignoring SIDE
(so two legacy connections will match each other).
I will send "ok\n" when the matching connection is established, or finished = time.time()
disconnect if no matching connection is made within MAX_WAIT_TIME if not self._had_buddy: # 1
seconds. I will disconnect if you send data before the "ok\n". All data total_time = finished - self._started
you get after the "ok\n" will be from the other side. You will not self.factory.recordUsage(self._started, "lonely", 0,
receive "ok\n" until the other side has also connected and submitted a total_time, None)
matching token (and differing SIDE). if self._had_buddy and self._buddy: # 2,4
total_bytes = self._total_sent + self._buddy._total_sent
starts = [self._started, self._buddy._started]
total_time = finished - min(starts)
waiting_time = max(starts) - min(starts)
self.factory.recordUsage(self._started, "happy", total_bytes,
total_time, waiting_time)
In addition, the connections will be dropped after MAXLENGTH bytes have def disconnect(self):
been sent by either side, or MAXTIME seconds have elapsed after the self.transport.loseConnection()
matching connections were established. A future API will reveal these self.factory.transitFailed(self)
limits to clients instead of causing mysterious spontaneous failures. finished = time.time()
total_time = finished - self._started
self.factory.recordUsage(self._started, "errory", 0,
total_time, None)
These relay connections are not half-closeable (unlike full TCP class Transit(protocol.ServerFactory):
connections, applications will not receive any data after half-closing # I manage pairs of simultaneous connections to a secondary TCP port,
their outgoing side). Applications must negotiate shutdown with their # both forwarded to the other. Clients must begin each connection with
peer and not close the connection until all data has finished # "please relay TOKEN for SIDE\n" (or a legacy form without the "for
transferring in both directions. Applications which only need to send # SIDE"). Two connections match if they use the same TOKEN and have
data in one direction can use close() as usual. # different SIDEs (the redundant connections are dropped when a match is
""" # made). Legacy connections match any with the same TOKEN, ignoring SIDE
# (so two legacy connections will match each other).
# I will send "ok\n" when the matching connection is established, or
# disconnect if no matching connection is made within MAX_WAIT_TIME
# seconds. I will disconnect if you send data before the "ok\n". All data
# you get after the "ok\n" will be from the other side. You will not
# receive "ok\n" until the other side has also connected and submitted a
# matching token (and differing SIDE).
# In addition, the connections will be dropped after MAXLENGTH bytes have
# been sent by either side, or MAXTIME seconds have elapsed after the
# matching connections were established. A future API will reveal these
# limits to clients instead of causing mysterious spontaneous failures.
# These relay connections are not half-closeable (unlike full TCP
# connections, applications will not receive any data after half-closing
# their outgoing side). Applications must negotiate shutdown with their
# peer and not close the connection until all data has finished
# transferring in both directions. Applications which only need to send
# data in one direction can use close() as usual.
# TODO: unused
MAX_WAIT_TIME = 30*SECONDS MAX_WAIT_TIME = 30*SECONDS
# TODO: unused
MAXLENGTH = 10*MB MAXLENGTH = 10*MB
# TODO: unused
MAXTIME = 60*SECONDS MAXTIME = 60*SECONDS
protocol = TransitConnection
def __init__(self, usage, get_timestamp): def __init__(self, blur_usage, log_file, usage_db):
self.active_connections = ActiveConnections() self._blur_usage = blur_usage
self.pending_requests = PendingRequests(self.active_connections) self._log_requests = blur_usage is None
self.usage = usage if self._blur_usage:
self._timestamp = get_timestamp log.msg("blurring access times to %d seconds" % self._blur_usage)
self._rebooted = self._timestamp() log.msg("not logging Transit connections to Twisted log")
else:
log.msg("not blurring access times")
self._debug_log = False
self._log_file = log_file
self._db = None
if usage_db:
self._db = get_db(usage_db)
self._rebooted = time.time()
# we don't track TransitConnections until they submit a token
self._pending_requests = {} # token -> set((side, TransitConnection))
self._active_connections = set() # TransitConnection
def update_stats(self): def connection_got_token(self, token, new_side, new_tc):
if token not in self._pending_requests:
self._pending_requests[token] = set()
potentials = self._pending_requests[token]
for old in potentials:
(old_side, old_tc) = old
if ((old_side is None)
or (new_side is None)
or (old_side != new_side)):
# we found a match
if self._debug_log:
log.msg("transit relay 2: %s" % new_tc.describeToken())
# drop and stop tracking the rest
potentials.remove(old)
for (_, leftover_tc) in potentials:
leftover_tc.disconnect() # TODO: not "errory"?
self._pending_requests.pop(token)
# glue the two ends together
self._active_connections.add(new_tc)
self._active_connections.add(old_tc)
new_tc.buddy_connected(old_tc)
old_tc.buddy_connected(new_tc)
return
if self._debug_log:
log.msg("transit relay 1: %s" % new_tc.describeToken())
potentials.add((new_side, new_tc))
# TODO: timer
def transitFinished(self, tc, token, side, description):
if token in self._pending_requests:
side_tc = (side, tc)
if side_tc in self._pending_requests[token]:
self._pending_requests[token].remove(side_tc)
if not self._pending_requests[token]: # set is now empty
del self._pending_requests[token]
if self._debug_log:
log.msg("transitFinished %s" % (description,))
self._active_connections.discard(tc)
def transitFailed(self, p):
if self._debug_log:
log.msg("transitFailed %r" % p)
pass
def recordUsage(self, started, result, total_bytes,
total_time, waiting_time):
if self._debug_log:
log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes)
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
total_bytes = blur_size(total_bytes)
if self._log_file is not None:
data = {"started": started,
"total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": result,
}
self._log_file.write(json.dumps(data)+"\n")
self._log_file.flush()
if self._db:
self._db.execute("INSERT INTO `usage`"
" (`started`, `total_time`, `waiting_time`,"
" `total_bytes`, `result`)"
" VALUES (?,?,?, ?,?)",
(started, total_time, waiting_time,
total_bytes, result))
self._update_stats()
self._db.commit()
def timerUpdateStats(self):
if self._db:
self._update_stats()
self._db.commit()
def _update_stats(self):
# current status: should be zero when idle
rebooted = self._rebooted
updated = time.time()
connected = len(self._active_connections) / 2
# TODO: when a connection is half-closed, len(active) will be odd. a # TODO: when a connection is half-closed, len(active) will be odd. a
# moment later (hopefully) the other side will disconnect, but # moment later (hopefully) the other side will disconnect, but
# _update_stats isn't updated until later. # _update_stats isn't updated until later.
waiting = len(self._pending_requests)
# "waiting" doesn't count multiple parallel connections from the same # "waiting" doesn't count multiple parallel connections from the same
# side # side
self.usage.update_stats( incomplete_bytes = sum(tc._total_sent
rebooted=self._rebooted, for tc in self._active_connections)
updated=self._timestamp(), self._db.execute("DELETE FROM `current`")
connected=len(self.active_connections._connections), self._db.execute("INSERT INTO `current`"
waiting=len(self.pending_requests._requests), " (`rebooted`, `updated`, `connected`, `waiting`,"
incomplete_bytes=sum( " `incomplete_bytes`)"
tc._total_sent " VALUES (?, ?, ?, ?, ?)",
for tc in self.active_connections._connections (rebooted, updated, connected, waiting,
), incomplete_bytes))
)
@implementer(ITransitClient)
class WebSocketTransitConnection(WebSocketServerProtocol):
started_time = None
def send(self, data):
"""
ITransitClient API
"""
self.sendMessage(data, isBinary=True)
def disconnect(self):
"""
ITransitClient API
"""
self.sendClose(1000, None)
def connect_partner(self, other):
"""
ITransitClient API
"""
self._buddy = other
self._buddy._client.transport.registerProducer(self.transport, True)
def disconnect_partner(self):
"""
ITransitClient API
"""
assert self._buddy is not None, "internal error: no buddy"
if self.factory.log_requests:
log.msg("buddy_disconnected {}".format(self._buddy.get_token()))
self._buddy._client.disconnect()
self._buddy = None
def connectionMade(self):
"""
IProtocol API
"""
super(WebSocketTransitConnection, self).connectionMade()
self.started_time = time.time()
self._first_message = True
self._state = TransitServerState(
self.factory.transit.pending_requests,
self.factory.transit.usage,
)
# uncomment to turn on state-machine tracing
# def tracer(oldstate, theinput, newstate):
# print("WSTRACE: {}: {} --{}--> {}".format(id(self), oldstate, theinput, newstate))
# self._state.set_trace_function(tracer)
def onOpen(self):
self._state.connection_made(self)
def onMessage(self, payload, isBinary):
"""
We may have a 'handshake' on our hands or we may just have some bytes to relay
"""
if not isBinary:
raise ValueError(
"All messages must be binary"
)
if self._first_message:
self._first_message = False
token = None
old = re.search(br"^please relay (\w{64})$", payload)
if old:
token = old.group(1)
self._state.please_relay(token)
# new: "please relay {64} for side {16}\n"
new = re.search(br"^please relay (\w{64}) for side (\w{16})$", payload)
if new:
token = new.group(1)
side = new.group(2)
self._state.please_relay_for_side(token, side)
if token is None:
self._state.bad_token()
else:
self._state.got_bytes(payload)
def onClose(self, wasClean, code, reason):
"""
IWebSocketChannel API
"""
self._state.connection_lost()

View File

@ -1,238 +0,0 @@
import time
import json
from twisted.python import log
from zope.interface import (
implementer,
Interface,
)
def create_usage_tracker(blur_usage, log_file, usage_db):
"""
:param int blur_usage: see UsageTracker
:param log_file: None or a file-like object to write JSON-encoded
lines of usage information to.
:param usage_db: None or an sqlite3 database connection
:returns: a new UsageTracker instance configured with backends.
"""
tracker = UsageTracker(blur_usage)
if usage_db:
tracker.add_backend(DatabaseUsageRecorder(usage_db))
if log_file:
tracker.add_backend(LogFileUsageRecorder(log_file))
return tracker
class IUsageWriter(Interface):
"""
Records actual usage statistics in some way
"""
def record_usage(started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
"""
:param int started: timestemp when this connection began
:param float total_time: total seconds this connection lasted
:param float waiting_time: None or the total seconds one side
waited for the other
:param int total_bytes: the total bytes sent. In case the
connection was concluded successfully, only one side will
record the total bytes (but count both).
:param str mood: the 'mood' of the connection
"""
@implementer(IUsageWriter)
class MemoryUsageRecorder:
"""
Remebers usage records in memory.
"""
def __init__(self):
self.events = []
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
"""
IUsageWriter.
"""
data = {
"started": started,
"total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": mood,
}
self.events.append(data)
@implementer(IUsageWriter)
class LogFileUsageRecorder:
"""
Writes usage records to a file. The records are written in JSON,
one record per line.
"""
def __init__(self, writable_file):
self._file = writable_file
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
"""
IUsageWriter.
"""
data = {
"started": started,
"total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": mood,
}
self._file.write(json.dumps(data) + "\n")
self._file.flush()
@implementer(IUsageWriter)
class DatabaseUsageRecorder:
"""
Write usage records into a database
"""
def __init__(self, db):
self._db = db
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
"""
IUsageWriter.
"""
self._db.execute(
"INSERT INTO `usage`"
" (`started`, `total_time`, `waiting_time`,"
" `total_bytes`, `result`)"
" VALUES (?,?,?,?,?)",
(started, total_time, waiting_time, total_bytes, mood)
)
# original code did "self._update_stats()" here, thus causing
# "global" stats update on every connection update .. should
# we repeat this behavior, or really only record every
# 60-seconds with the timer?
self._db.commit()
class UsageTracker(object):
"""
Tracks usage statistics of connections
"""
def __init__(self, blur_usage):
"""
:param int blur_usage: None or the number of seconds to use as a
window around which to blur time statistics (e.g. "60" means times
will be rounded to 1 minute intervals). When blur_usage is
non-zero, sizes will also be rounded into buckets of "one
megabyte", "one gigabyte" or "lots"
"""
self._backends = set()
self._blur_usage = blur_usage
if blur_usage:
log.msg("blurring access times to %d seconds" % self._blur_usage)
else:
log.msg("not blurring access times")
def add_backend(self, backend):
"""
Add a new backend.
:param IUsageWriter backend: the backend to add
"""
self._backends.add(backend)
def record(self, started, buddy_started, result, bytes_sent, buddy_bytes):
"""
:param int started: timestamp when our connection started
:param int buddy_started: None, or the timestamp when our
partner's connection started (will be None if we don't yet
have a partner).
:param str result: a label for the result of the connection
(one of the "moods").
:param int bytes_sent: number of bytes we sent
:param int buddy_bytes: number of bytes our partner sent
"""
# ideally self._reactor.seconds() or similar, but ..
finished = time.time()
if buddy_started is not None:
starts = [started, buddy_started]
total_time = finished - min(starts)
waiting_time = max(starts) - min(starts)
total_bytes = bytes_sent + buddy_bytes
else:
total_time = finished - started
waiting_time = None
total_bytes = bytes_sent
# note that "bytes_sent" should always be 0 here, but
# we're recording what the state-machine remembered in any
# case
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
total_bytes = blur_size(total_bytes)
# This is "a dict" instead of "kwargs" because we have to make
# it into a dict for the log use-case and in-memory/testing
# use-case anyway so this is less repeats of the names.
self._notify_backends({
"started": started,
"total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": result,
})
def update_stats(self, rebooted, updated, connected, waiting,
incomplete_bytes):
"""
Update general statistics.
"""
# in original code, this is only recorded in the database
# .. perhaps a better way to do this, but ..
for backend in self._backends:
if isinstance(backend, DatabaseUsageRecorder):
backend._db.execute("DELETE FROM `current`")
backend._db.execute(
"INSERT INTO `current`"
" (`rebooted`, `updated`, `connected`, `waiting`,"
" `incomplete_bytes`)"
" VALUES (?, ?, ?, ?, ?)",
(int(rebooted), int(updated), connected, waiting,
incomplete_bytes)
)
def _notify_backends(self, data):
"""
Internal helper. Tell every backend we have about a new usage record.
"""
for backend in self._backends:
backend.record_usage(**data)
def round_to(size, coarseness):
return int(coarseness*(1+int((size-1)/coarseness)))
def blur_size(size):
if size == 0:
return 0
if size < 1e6:
return round_to(size, 10e3)
if size < 1e9:
return round_to(size, 1e6)
return round_to(size, 100e6)

View File

@ -4,7 +4,7 @@
# and then run "tox" from this directory. # and then run "tox" from this directory.
[tox] [tox]
envlist = {py37,py38,py39,py310,pypy} envlist = {py27,py34,py35,py36,pypy}
skip_missing_interpreters = True skip_missing_interpreters = True
minversion = 2.4.0 minversion = 2.4.0

View File

@ -1,82 +0,0 @@
"""
This is a test-client for the transit-relay that uses WebSockets.
If an additional command-line argument (anything) is added, it will
send 5 messages upon connection. Otherwise, it just prints out what is
received. Uses a fixed token of 64 'a' characters. Always connects on
localhost:4002
"""
import sys
from twisted.internet import endpoints
from twisted.internet.defer import (
Deferred,
inlineCallbacks,
)
from twisted.internet.task import react, deferLater
from autobahn.twisted.websocket import (
WebSocketClientProtocol,
WebSocketClientFactory,
)
class RelayEchoClient(WebSocketClientProtocol):
def onOpen(self):
self._received = b""
self.sendMessage(
u"please relay {} for side {}".format(
self.factory.token,
self.factory.side,
).encode("ascii"),
True,
)
def onMessage(self, data, isBinary):
print(">onMessage: {} bytes".format(len(data)))
print(data, isBinary)
if data == b"ok\n":
self.factory.ready.callback(None)
else:
self._received += data
if False:
# test abrupt hangup from receiving side
self.transport.loseConnection()
def onClose(self, wasClean, code, reason):
print(">onClose", wasClean, code, reason)
self.factory.done.callback(reason)
if not self.factory.ready.called:
self.factory.ready.errback(RuntimeError(reason))
@react
@inlineCallbacks
def main(reactor):
will_send_message = len(sys.argv) > 1
ep = endpoints.clientFromString(reactor, "tcp:localhost:4002")
f = WebSocketClientFactory("ws://127.0.0.1:4002/")
f.reactor = reactor
f.protocol = RelayEchoClient
f.token = "a" * 64
f.side = "0" * 16 if will_send_message else "1" * 16
f.done = Deferred()
f.ready = Deferred()
proto = yield ep.connect(f)
print("proto", proto)
yield f.ready
print("ready")
if will_send_message:
for _ in range(5):
print("sending message")
proto.sendMessage(b"it's a message", True)
yield deferLater(reactor, 0.2)
yield proto.sendClose()
print("closing")
yield f.done
print("relayed {} bytes:".format(len(proto._received)))
print(proto._received.decode("utf8"))