Compare commits
2 Commits
master
...
no-pypiwin
Author | SHA1 | Date | |
---|---|---|---|
|
32d265c10a | ||
|
99ea7b2488 |
|
@ -14,6 +14,7 @@ environment:
|
||||||
- PYTHON: "C:\\Python27"
|
- PYTHON: "C:\\Python27"
|
||||||
- PYTHON: "C:\\Python27-x64"
|
- PYTHON: "C:\\Python27-x64"
|
||||||
DISTUTILS_USE_SDK: "1"
|
DISTUTILS_USE_SDK: "1"
|
||||||
|
- PYTHON: "C:\\Python34"
|
||||||
- PYTHON: "C:\\Python35"
|
- PYTHON: "C:\\Python35"
|
||||||
- PYTHON: "C:\\Python36"
|
- PYTHON: "C:\\Python36"
|
||||||
- PYTHON: "C:\\Python36-x64"
|
- PYTHON: "C:\\Python36-x64"
|
||||||
|
|
35
.github/workflows/test.yml
vendored
35
.github/workflows/test.yml
vendored
|
@ -1,35 +0,0 @@
|
||||||
name: Tests
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ master ]
|
|
||||||
pull_request:
|
|
||||||
branches: [ master ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
testing:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
python-version: [3.7, 3.8, 3.9]
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Set up Python
|
|
||||||
uses: actions/setup-python@v2
|
|
||||||
with:
|
|
||||||
python-version: ${{ matrix.python-version }}
|
|
||||||
|
|
||||||
- name: Install dependencies
|
|
||||||
run: |
|
|
||||||
python -m pip install --upgrade pip tox codecov
|
|
||||||
tox --notest -e coverage
|
|
||||||
|
|
||||||
- name: Test
|
|
||||||
run: |
|
|
||||||
python --version
|
|
||||||
tox -e coverage
|
|
||||||
|
|
||||||
- name: Upload Coverage
|
|
||||||
run: codecov
|
|
28
.travis.yml
28
.travis.yml
|
@ -1,17 +1,18 @@
|
||||||
arch:
|
|
||||||
- amd64
|
|
||||||
- ppc64le
|
|
||||||
language: python
|
|
||||||
# defaults: the py3.7 environment overrides these
|
|
||||||
dist: trusty
|
|
||||||
sudo: false
|
sudo: false
|
||||||
|
language: python
|
||||||
cache: pip
|
cache: pip
|
||||||
before_cache:
|
before_cache:
|
||||||
- rm -f $HOME/.cache/pip/log/debug.log
|
- rm -f $HOME/.cache/pip/log/debug.log
|
||||||
branches:
|
branches:
|
||||||
except:
|
except:
|
||||||
- /^WIP-.*$/
|
- /^WIP-.*$/
|
||||||
|
python:
|
||||||
|
- "2.7"
|
||||||
|
- "3.3"
|
||||||
|
- "3.4"
|
||||||
|
- "3.5"
|
||||||
|
- "3.6"
|
||||||
|
- "nightly"
|
||||||
install:
|
install:
|
||||||
- pip install -U pip tox virtualenv codecov
|
- pip install -U pip tox virtualenv codecov
|
||||||
before_script:
|
before_script:
|
||||||
|
@ -24,15 +25,6 @@ script:
|
||||||
after_success:
|
after_success:
|
||||||
- codecov
|
- codecov
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
|
||||||
- python: 2.7
|
|
||||||
- python: 3.5
|
|
||||||
- python: 3.6
|
|
||||||
- python: 3.7
|
|
||||||
# we don't actually need sudo, but that kicks us onto GCE, which lets
|
|
||||||
# us get xenial
|
|
||||||
sudo: true
|
|
||||||
dist: xenial
|
|
||||||
- python: nightly
|
|
||||||
allow_failures:
|
allow_failures:
|
||||||
- python: nightly
|
- python: "3.3"
|
||||||
|
- python: "nightly"
|
||||||
|
|
17
NEWS.md
17
NEWS.md
|
@ -1,22 +1,5 @@
|
||||||
User-visible changes in "magic-wormhole-transit-relay":
|
User-visible changes in "magic-wormhole-transit-relay":
|
||||||
|
|
||||||
## unreleased
|
|
||||||
|
|
||||||
* drop Python 2, Python 3.5 and 3.6 support
|
|
||||||
|
|
||||||
## Release 0.2.1 (11-Sep-2019)
|
|
||||||
|
|
||||||
* listen on IPv4+IPv6 properly (#12)
|
|
||||||
|
|
||||||
|
|
||||||
## Release 0.2.0 (10-Sep-2019)
|
|
||||||
|
|
||||||
* listen on IPv4+IPv6 socket by default (#12)
|
|
||||||
* enable SO_KEEPALIVE on all connections (#9)
|
|
||||||
* drop support for py3.3 and py3.4
|
|
||||||
* improve munin plugins
|
|
||||||
|
|
||||||
|
|
||||||
## Release 0.1.2 (19-Mar-2018)
|
## Release 0.1.2 (19-Mar-2018)
|
||||||
|
|
||||||
* Allow more simultaneous connections, by increasing the rlimits() ceiling at
|
* Allow more simultaneous connections, by increasing the rlimits() ceiling at
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
# magic-wormhole-transit-relay
|
# magic-wormhole-transit-relay
|
||||||
|
|
||||||
[![PyPI](http://img.shields.io/pypi/v/magic-wormhole-transit-relay.svg)](https://pypi.python.org/pypi/magic-wormhole-transit-relay)
|
[![PyPI](http://img.shields.io/pypi/v/magic-wormhole-transit-relay.svg)](https://pypi.python.org/pypi/magic-wormhole-transit-relay)
|
||||||
![Tests](https://github.com/magic-wormhole/magic-wormhole-transit-relay/workflows/Tests/badge.svg)
|
[![Build Status](https://travis-ci.org/warner/magic-wormhole-transit-relay.svg?branch=master)](https://travis-ci.org/warner/magic-wormhole-transit-relay)
|
||||||
[![codecov.io](https://codecov.io/github/magic-wormhole/magic-wormhole-transit-relay/coverage.svg?branch=master)](https://codecov.io/github/magic-wormhole/magic-wormhole-transit-relay?branch=master)
|
[![Windows Build Status](https://ci.appveyor.com/api/projects/status/61kgarqikolbvj1m/branch/master?svg=true)](https://ci.appveyor.com/project/warner/magic-wormhole-transit-relay)
|
||||||
|
[![codecov.io](https://codecov.io/github/warner/magic-wormhole-transit-relay/coverage.svg?branch=master)](https://codecov.io/github/warner/magic-wormhole-transit-relay?branch=master)
|
||||||
|
|
||||||
|
|
||||||
Transit Relay server for Magic-Wormhole
|
Transit Relay server for Magic-Wormhole
|
||||||
|
|
54
client.py
54
client.py
|
@ -1,54 +0,0 @@
|
||||||
"""
|
|
||||||
This is a test-client for the transit-relay that uses TCP. It
|
|
||||||
doesn't send any data, only prints out data that is received. Uses a
|
|
||||||
fixed token of 64 'a' characters. Always connects on localhost:4001
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
from twisted.internet import endpoints
|
|
||||||
from twisted.internet.defer import (
|
|
||||||
Deferred,
|
|
||||||
)
|
|
||||||
from twisted.internet.task import react
|
|
||||||
from twisted.internet.error import (
|
|
||||||
ConnectionDone,
|
|
||||||
)
|
|
||||||
from twisted.internet.protocol import (
|
|
||||||
Protocol,
|
|
||||||
Factory,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class RelayEchoClient(Protocol):
|
|
||||||
"""
|
|
||||||
Speaks the version1 magic wormhole transit relay protocol (as a client)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def connectionMade(self):
|
|
||||||
print(">CONNECT")
|
|
||||||
self.data = b""
|
|
||||||
self.transport.write(u"please relay {}\n".format(self.factory.token).encode("ascii"))
|
|
||||||
|
|
||||||
def dataReceived(self, data):
|
|
||||||
print(">RECV {} bytes".format(len(data)))
|
|
||||||
print(data.decode("ascii"))
|
|
||||||
self.data += data
|
|
||||||
if data == "ok\n":
|
|
||||||
self.transport.write("ding\n")
|
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
|
||||||
if isinstance(reason.value, ConnectionDone):
|
|
||||||
self.factory.done.callback(None)
|
|
||||||
else:
|
|
||||||
print(">DISCONNCT: {}".format(reason))
|
|
||||||
self.factory.done.callback(reason)
|
|
||||||
|
|
||||||
|
|
||||||
@react
|
|
||||||
def main(reactor):
|
|
||||||
ep = endpoints.clientFromString(reactor, "tcp:localhost:4001")
|
|
||||||
f = Factory.forProtocol(RelayEchoClient)
|
|
||||||
f.token = "a" * 64
|
|
||||||
f.done = Deferred()
|
|
||||||
ep.connect(f)
|
|
||||||
return f.done
|
|
|
@ -48,43 +48,13 @@ The relevant arguments are:
|
||||||
* ``--port=``: the endpoint to listen on, like ``tcp:4001``
|
* ``--port=``: the endpoint to listen on, like ``tcp:4001``
|
||||||
* ``--log-fd=``: writes JSON lines to the given file descriptor for each connection
|
* ``--log-fd=``: writes JSON lines to the given file descriptor for each connection
|
||||||
* ``--usage-db=``: maintains a SQLite database with current and historical usage data
|
* ``--usage-db=``: maintains a SQLite database with current and historical usage data
|
||||||
* ``--blur-usage=``: round logged timestamps and data sizes
|
* ``--blur-usage=``: if provided, logs are rounded to the given number of
|
||||||
|
seconds, and data sizes are rounded too
|
||||||
For WebSockets support, two additional arguments:
|
|
||||||
|
|
||||||
* ``--websocket``: the endpoint to listen for websocket connections
|
|
||||||
on, like ``tcp:4002``
|
|
||||||
* ``--websocket-url``: the URL of the WebSocket connection. This may
|
|
||||||
be different from the listening endpoint because of port-forwarding
|
|
||||||
and so forth. By default it will be ``ws://localhost:<port>`` if not
|
|
||||||
provided
|
|
||||||
|
|
||||||
When you use ``twist``, the relay runs in the foreground, so it will
|
When you use ``twist``, the relay runs in the foreground, so it will
|
||||||
generally exit as soon as the controlling terminal exits. For persistent
|
generally exit as soon as the controlling terminal exits. For persistent
|
||||||
environments, you should daemonize the server.
|
environments, you should daemonize the server.
|
||||||
|
|
||||||
## Minimizing Log Data
|
|
||||||
|
|
||||||
The server code attempts to strike a balance between minimizing data
|
|
||||||
collected about users, and recording enough information to manage the server
|
|
||||||
and monitor its operation. The standard `twistd.log` file does not record IP
|
|
||||||
addresses unless an error occurs. The optional `--log-fd=` file (and the
|
|
||||||
SQLite database generated if `--usage-db=` is enabled) record the time at
|
|
||||||
which the first side connected, the time until the second side connected, the
|
|
||||||
total transfer time, the total number of bytes transferred, and the
|
|
||||||
success/failure status (the "mood").
|
|
||||||
|
|
||||||
If `--blur-usage=` is provided, these recorded file sizes are rounded down:
|
|
||||||
sizes less than 1kB are recorded as 0, sizes up to 1MB are rounded to the
|
|
||||||
nearest kB, sizes up to 1GB are rounded to the nearest MB, and sizes above
|
|
||||||
1GB are rounded to the nearest 100MB.
|
|
||||||
|
|
||||||
The argument to `--blur-usage=` is treated as a number of seconds, and the
|
|
||||||
"first side connects" timestamp is rounded to a multiple of this. For
|
|
||||||
example, `--blur-usage=3600` means all timestamps are rounded down to the
|
|
||||||
nearest hour. The waiting time and total time deltas are recorded without
|
|
||||||
rounding.
|
|
||||||
|
|
||||||
## Daemonization
|
## Daemonization
|
||||||
|
|
||||||
A production installation will want to daemonize the server somehow. One
|
A production installation will want to daemonize the server somehow. One
|
||||||
|
@ -131,10 +101,6 @@ management can use the ``--log-fd=`` option to emit logs, then route those
|
||||||
logs into a suitable analysis tool. Other environments might be content to
|
logs into a suitable analysis tool. Other environments might be content to
|
||||||
use ``--usage-db=`` and run the included Munin plugins to monitor usage.
|
use ``--usage-db=`` and run the included Munin plugins to monitor usage.
|
||||||
|
|
||||||
There is also a
|
|
||||||
[Dockerfile](https://github.com/ggeorgovassilis/magic-wormhole-transit-relay-docker),
|
|
||||||
written by George Georgovassilis, which you might find useful.
|
|
||||||
|
|
||||||
## Configuring Clients
|
## Configuring Clients
|
||||||
|
|
||||||
The transit relay will listen on an "endpoint" (usually a TCP port, but it
|
The transit relay will listen on an "endpoint" (usually a TCP port, but it
|
||||||
|
|
|
@ -18,6 +18,7 @@ The resuting "usage.sqlite" should be passed into --usage-db=, e.g. "twist
|
||||||
transitrelay --usage=.../PATH/TO/usage.sqlite".
|
transitrelay --usage=.../PATH/TO/usage.sqlite".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals, print_function
|
||||||
import sys
|
import sys
|
||||||
from wormhole_transit_relay.database import open_existing_db, create_db
|
from wormhole_transit_relay.database import open_existing_db, create_db
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
|
||||||
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
import os, sys, time, sqlite3
|
import os, sys, time, sqlite3
|
||||||
|
|
||||||
CONFIG = """\
|
CONFIG = """\
|
||||||
graph_title Magic-Wormhole Transit Active Channels
|
graph_title Magic-Wormhole Transit Active Channels
|
||||||
graph_vlabel Channels
|
graph_vlabel Channels
|
||||||
graph_category wormhole
|
graph_category network
|
||||||
waiting.label Transit Waiting
|
waiting.label Transit Waiting
|
||||||
waiting.draw LINE1
|
waiting.draw LINE1
|
||||||
waiting.type GAUGE
|
waiting.type GAUGE
|
||||||
|
|
|
@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
|
||||||
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
import os, sys, time, sqlite3
|
import os, sys, time, sqlite3
|
||||||
|
|
||||||
CONFIG = """\
|
CONFIG = """\
|
||||||
graph_title Magic-Wormhole Transit Usage (since reboot)
|
graph_title Magic-Wormhole Transit Usage (since reboot)
|
||||||
graph_vlabel Bytes Since Reboot
|
graph_vlabel Bytes Since Reboot
|
||||||
graph_category wormhole
|
graph_category network
|
||||||
bytes.label Transit Bytes (complete)
|
bytes.label Transit Bytes (complete)
|
||||||
bytes.draw LINE1
|
bytes.draw LINE1
|
||||||
bytes.type GAUGE
|
bytes.type GAUGE
|
||||||
|
|
|
@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
|
||||||
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
import os, sys, time, sqlite3
|
import os, sys, time, sqlite3
|
||||||
|
|
||||||
CONFIG = """\
|
CONFIG = """\
|
||||||
graph_title Magic-Wormhole Transit Usage (all time)
|
graph_title Magic-Wormhole Transit Usage (all time)
|
||||||
graph_vlabel Bytes Since DB Creation
|
graph_vlabel Bytes Since DB Creation
|
||||||
graph_category wormhole
|
graph_category network
|
||||||
bytes.label Transit Bytes (complete)
|
bytes.label Transit Bytes (complete)
|
||||||
bytes.draw LINE1
|
bytes.draw LINE1
|
||||||
bytes.type GAUGE
|
bytes.type GAUGE
|
||||||
|
|
|
@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
|
||||||
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
import os, sys, time, sqlite3
|
import os, sys, time, sqlite3
|
||||||
|
|
||||||
CONFIG = """\
|
CONFIG = """\
|
||||||
graph_title Magic-Wormhole Transit Server Events (since reboot)
|
graph_title Magic-Wormhole Transit Server Events (since reboot)
|
||||||
graph_vlabel Events Since Reboot
|
graph_vlabel Events Since Reboot
|
||||||
graph_category wormhole
|
graph_category network
|
||||||
happy.label Happy
|
happy.label Happy
|
||||||
happy.draw LINE1
|
happy.draw LINE1
|
||||||
happy.type GAUGE
|
happy.type GAUGE
|
||||||
|
|
3
misc/munin/wormhole_transit_events_alltime
Executable file → Normal file
3
misc/munin/wormhole_transit_events_alltime
Executable file → Normal file
|
@ -7,12 +7,13 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
|
||||||
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
env.usagedb /path/to/your/wormhole/server/usage.sqlite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
import os, sys, time, sqlite3
|
import os, sys, time, sqlite3
|
||||||
|
|
||||||
CONFIG = """\
|
CONFIG = """\
|
||||||
graph_title Magic-Wormhole Transit Server Events (all time)
|
graph_title Magic-Wormhole Transit Server Events (all time)
|
||||||
graph_vlabel Events
|
graph_vlabel Events
|
||||||
graph_category wormhole
|
graph_category network
|
||||||
happy.label Happy
|
happy.label Happy
|
||||||
happy.draw LINE1
|
happy.draw LINE1
|
||||||
happy.type GAUGE
|
happy.type GAUGE
|
||||||
|
|
5
setup.py
5
setup.py
|
@ -18,11 +18,10 @@ setup(name="magic-wormhole-transit-relay",
|
||||||
],
|
],
|
||||||
package_data={"wormhole_transit_relay": ["db-schemas/*.sql"]},
|
package_data={"wormhole_transit_relay": ["db-schemas/*.sql"]},
|
||||||
install_requires=[
|
install_requires=[
|
||||||
"twisted >= 21.2.0",
|
"twisted >= 17.5.0",
|
||||||
"autobahn >= 21.3.1",
|
|
||||||
],
|
],
|
||||||
extras_require={
|
extras_require={
|
||||||
':sys_platform=="win32"': ["pypiwin32"],
|
':sys_platform=="win32"': ["twisted[windows]"],
|
||||||
"dev": ["mock", "tox", "pyflakes"],
|
"dev": ["mock", "tox", "pyflakes"],
|
||||||
},
|
},
|
||||||
test_suite="wormhole_transit_relay.test",
|
test_suite="wormhole_transit_relay.test",
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
import os
|
import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
|
@ -1,477 +0,0 @@
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
import automat
|
|
||||||
from twisted.python import log
|
|
||||||
from zope.interface import (
|
|
||||||
Interface,
|
|
||||||
Attribute,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ITransitClient(Interface):
|
|
||||||
"""
|
|
||||||
Represents the client side of a connection to this transit
|
|
||||||
relay. This is used by TransitServerState instances.
|
|
||||||
"""
|
|
||||||
|
|
||||||
started_time = Attribute("timestamp when the connection was established")
|
|
||||||
|
|
||||||
def send(data):
|
|
||||||
"""
|
|
||||||
Send some byets to the client
|
|
||||||
"""
|
|
||||||
|
|
||||||
def disconnect():
|
|
||||||
"""
|
|
||||||
Disconnect the client transport
|
|
||||||
"""
|
|
||||||
|
|
||||||
def connect_partner(other):
|
|
||||||
"""
|
|
||||||
Hook up to our partner.
|
|
||||||
:param ITransitClient other: our partner
|
|
||||||
"""
|
|
||||||
|
|
||||||
def disconnect_partner():
|
|
||||||
"""
|
|
||||||
Disconnect our partner's transport
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveConnections(object):
|
|
||||||
"""
|
|
||||||
Tracks active connections.
|
|
||||||
|
|
||||||
A connection is 'active' when both sides have shown up and they
|
|
||||||
are glued together (and thus could be passing data back and forth
|
|
||||||
if any is flowing).
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
self._connections = set()
|
|
||||||
|
|
||||||
def register(self, side0, side1):
|
|
||||||
"""
|
|
||||||
A connection has become active so register both its sides
|
|
||||||
|
|
||||||
:param TransitConnection side0: one side of the connection
|
|
||||||
:param TransitConnection side1: one side of the connection
|
|
||||||
"""
|
|
||||||
self._connections.add(side0)
|
|
||||||
self._connections.add(side1)
|
|
||||||
|
|
||||||
def unregister(self, side):
|
|
||||||
"""
|
|
||||||
One side of a connection has become inactive.
|
|
||||||
|
|
||||||
:param TransitConnection side: an inactive side of a connection
|
|
||||||
"""
|
|
||||||
self._connections.discard(side)
|
|
||||||
|
|
||||||
|
|
||||||
class PendingRequests(object):
|
|
||||||
"""
|
|
||||||
Tracks outstanding (non-"active") requests.
|
|
||||||
|
|
||||||
We register client connections against the tokens we have
|
|
||||||
received. When the other side shows up we can thus match it to the
|
|
||||||
correct partner connection. At this point, the connection becomes
|
|
||||||
"active" is and is thus no longer "pending" and so will no longer
|
|
||||||
be in this collection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, active_connections):
|
|
||||||
"""
|
|
||||||
:param active_connections: an instance of ActiveConnections where
|
|
||||||
connections are put when both sides arrive.
|
|
||||||
"""
|
|
||||||
self._requests = defaultdict(set) # token -> set((side, TransitConnection))
|
|
||||||
self._active = active_connections
|
|
||||||
|
|
||||||
def unregister(self, token, side, tc):
|
|
||||||
"""
|
|
||||||
We no longer care about a particular client (e.g. it has
|
|
||||||
disconnected).
|
|
||||||
"""
|
|
||||||
if token in self._requests:
|
|
||||||
self._requests[token].discard((side, tc))
|
|
||||||
if not self._requests[token]:
|
|
||||||
# no more sides; token is dead
|
|
||||||
del self._requests[token]
|
|
||||||
self._active.unregister(tc)
|
|
||||||
|
|
||||||
def register(self, token, new_side, new_tc):
|
|
||||||
"""
|
|
||||||
A client has connected and successfully offered a token (and
|
|
||||||
optional 'side' token). If this is the first one for this
|
|
||||||
token, we merely remember it. If it is the second side for
|
|
||||||
this token we connect them together.
|
|
||||||
|
|
||||||
:param bytes token: the token for this connection.
|
|
||||||
|
|
||||||
:param bytes new_side: None or the side token for this connection
|
|
||||||
|
|
||||||
:param TransitServerState new_tc: the state-machine of the connection
|
|
||||||
|
|
||||||
:returns bool: True if we are the first side to register this
|
|
||||||
token
|
|
||||||
"""
|
|
||||||
potentials = self._requests[token]
|
|
||||||
for old in potentials:
|
|
||||||
(old_side, old_tc) = old
|
|
||||||
if ((old_side is None)
|
|
||||||
or (new_side is None)
|
|
||||||
or (old_side != new_side)):
|
|
||||||
# we found a match
|
|
||||||
|
|
||||||
# drop and stop tracking the rest
|
|
||||||
potentials.remove(old)
|
|
||||||
for (_, leftover_tc) in potentials.copy():
|
|
||||||
# Don't record this as errory. It's just a spare connection
|
|
||||||
# from the same side as a connection that got used. This
|
|
||||||
# can happen if the connection hint contains multiple
|
|
||||||
# addresses (we don't currently support those, but it'd
|
|
||||||
# probably be useful in the future).
|
|
||||||
leftover_tc.partner_connection_lost()
|
|
||||||
self._requests.pop(token, None)
|
|
||||||
|
|
||||||
# glue the two ends together
|
|
||||||
self._active.register(new_tc, old_tc)
|
|
||||||
new_tc.got_partner(old_tc)
|
|
||||||
old_tc.got_partner(new_tc)
|
|
||||||
return False
|
|
||||||
|
|
||||||
potentials.add((new_side, new_tc))
|
|
||||||
return True
|
|
||||||
# TODO: timer
|
|
||||||
|
|
||||||
|
|
||||||
class TransitServerState(object):
|
|
||||||
"""
|
|
||||||
Encapsulates the state-machine of the server side of a transit
|
|
||||||
relay connection.
|
|
||||||
|
|
||||||
Once the protocol has been told to relay (or to relay for a side)
|
|
||||||
it starts passing all received bytes to the other side until it
|
|
||||||
closes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_machine = automat.MethodicalMachine()
|
|
||||||
_client = None
|
|
||||||
_buddy = None
|
|
||||||
_token = None
|
|
||||||
_side = None
|
|
||||||
_first = None
|
|
||||||
_mood = "empty"
|
|
||||||
_total_sent = 0
|
|
||||||
|
|
||||||
def __init__(self, pending_requests, usage_recorder):
|
|
||||||
self._pending_requests = pending_requests
|
|
||||||
self._usage = usage_recorder
|
|
||||||
|
|
||||||
def get_token(self):
|
|
||||||
"""
|
|
||||||
:returns str: a string describing our token. This will be "-" if
|
|
||||||
we have no token yet, or "{16 chars}-<unsided>" if we have
|
|
||||||
just a token or "{16 chars}-{16 chars}" if we have a token and
|
|
||||||
a side.
|
|
||||||
"""
|
|
||||||
d = "-"
|
|
||||||
if self._token is not None:
|
|
||||||
d = self._token[:16].decode("ascii")
|
|
||||||
|
|
||||||
if self._side is not None:
|
|
||||||
d += "-" + self._side.decode("ascii")
|
|
||||||
else:
|
|
||||||
d += "-<unsided>"
|
|
||||||
return d
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def connection_made(self, client):
|
|
||||||
"""
|
|
||||||
A client has connected. May only be called once.
|
|
||||||
|
|
||||||
:param ITransitClient client: our client.
|
|
||||||
"""
|
|
||||||
# NB: the "only called once" is enforced by the state-machine;
|
|
||||||
# this input is only valid for the "listening" state, to which
|
|
||||||
# we never return.
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def please_relay(self, token):
|
|
||||||
"""
|
|
||||||
A 'please relay X' message has been received (the original version
|
|
||||||
of the protocol).
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def please_relay_for_side(self, token, side):
|
|
||||||
"""
|
|
||||||
A 'please relay X for side Y' message has been received (the
|
|
||||||
second version of the protocol).
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def bad_token(self):
|
|
||||||
"""
|
|
||||||
A bad token / relay line was received (e.g. couldn't be parsed)
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def got_partner(self, client):
|
|
||||||
"""
|
|
||||||
The partner for this relay session has been found
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def connection_lost(self):
|
|
||||||
"""
|
|
||||||
Our transport has failed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def partner_connection_lost(self):
|
|
||||||
"""
|
|
||||||
Our partner's transport has failed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.input()
|
|
||||||
def got_bytes(self, data):
|
|
||||||
"""
|
|
||||||
Some bytes have arrived (that aren't part of the handshake)
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _remember_client(self, client):
|
|
||||||
self._client = client
|
|
||||||
|
|
||||||
# note that there is no corresponding "_forget_client" because we
|
|
||||||
# may still want to access it after it is gone .. for example, to
|
|
||||||
# get the .started_time for logging purposes
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _register_token(self, token):
|
|
||||||
return self._real_register_token_for_side(token, None)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _register_token_for_side(self, token, side):
|
|
||||||
return self._real_register_token_for_side(token, side)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _unregister(self):
|
|
||||||
"""
|
|
||||||
remove us from the thing that remembers tokens and sides
|
|
||||||
"""
|
|
||||||
return self._pending_requests.unregister(self._token, self._side, self)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _send_bad(self):
|
|
||||||
self._mood = "errory"
|
|
||||||
self._client.send(b"bad handshake\n")
|
|
||||||
if self._client.factory.log_requests:
|
|
||||||
log.msg("transit handshake failure")
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _send_ok(self):
|
|
||||||
self._client.send(b"ok\n")
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _send_impatient(self):
|
|
||||||
self._client.send(b"impatient\n")
|
|
||||||
if self._client.factory.log_requests:
|
|
||||||
log.msg("transit impatience failure")
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _count_bytes(self, data):
|
|
||||||
self._total_sent += len(data)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _send_to_partner(self, data):
|
|
||||||
self._buddy._client.send(data)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _connect_partner(self, client):
|
|
||||||
self._buddy = client
|
|
||||||
self._client.connect_partner(client)
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _disconnect(self):
|
|
||||||
self._client.disconnect()
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _disconnect_partner(self):
|
|
||||||
self._client.disconnect_partner()
|
|
||||||
|
|
||||||
# some outputs to record "usage" information ..
|
|
||||||
@_machine.output()
|
|
||||||
def _record_usage(self):
|
|
||||||
if self._mood == "jilted":
|
|
||||||
if self._buddy and self._buddy._mood == "happy":
|
|
||||||
return
|
|
||||||
self._usage.record(
|
|
||||||
started=self._client.started_time,
|
|
||||||
buddy_started=self._buddy._client.started_time if self._buddy is not None else None,
|
|
||||||
result=self._mood,
|
|
||||||
bytes_sent=self._total_sent,
|
|
||||||
buddy_bytes=self._buddy._total_sent if self._buddy is not None else None
|
|
||||||
)
|
|
||||||
|
|
||||||
# some outputs to record the "mood" ..
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_happy(self):
|
|
||||||
self._mood = "happy"
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_lonely(self):
|
|
||||||
self._mood = "lonely"
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_redundant(self):
|
|
||||||
self._mood = "redundant"
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_impatient(self):
|
|
||||||
self._mood = "impatient"
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_errory(self):
|
|
||||||
self._mood = "errory"
|
|
||||||
|
|
||||||
@_machine.output()
|
|
||||||
def _mood_happy_if_first(self):
|
|
||||||
"""
|
|
||||||
We disconnected first so we're only happy if we also connected
|
|
||||||
first.
|
|
||||||
"""
|
|
||||||
if self._first:
|
|
||||||
self._mood = "happy"
|
|
||||||
else:
|
|
||||||
self._mood = "jilted"
|
|
||||||
|
|
||||||
def _real_register_token_for_side(self, token, side):
|
|
||||||
"""
|
|
||||||
A client has connected and sent a valid version 1 or version 2
|
|
||||||
handshake. If the former, `side` will be None.
|
|
||||||
|
|
||||||
In either case, we remember the tokens and register
|
|
||||||
ourselves. This might result in 'got_partner' notifications to
|
|
||||||
two state-machines if this is the second side for a given token.
|
|
||||||
|
|
||||||
:param bytes token: the token
|
|
||||||
:param bytes side: The side token (or None)
|
|
||||||
"""
|
|
||||||
self._token = token
|
|
||||||
self._side = side
|
|
||||||
self._first = self._pending_requests.register(token, side, self)
|
|
||||||
|
|
||||||
@_machine.state(initial=True)
|
|
||||||
def listening(self):
|
|
||||||
"""
|
|
||||||
Initial state, awaiting connection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.state()
|
|
||||||
def wait_relay(self):
|
|
||||||
"""
|
|
||||||
Waiting for a 'relay' message
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.state()
|
|
||||||
def wait_partner(self):
|
|
||||||
"""
|
|
||||||
Waiting for our partner to connect
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.state()
|
|
||||||
def relaying(self):
|
|
||||||
"""
|
|
||||||
Relaying bytes to our partner
|
|
||||||
"""
|
|
||||||
|
|
||||||
@_machine.state()
|
|
||||||
def done(self):
|
|
||||||
"""
|
|
||||||
Terminal state
|
|
||||||
"""
|
|
||||||
|
|
||||||
listening.upon(
|
|
||||||
connection_made,
|
|
||||||
enter=wait_relay,
|
|
||||||
outputs=[_remember_client],
|
|
||||||
)
|
|
||||||
listening.upon(
|
|
||||||
connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_errory],
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_relay.upon(
|
|
||||||
please_relay,
|
|
||||||
enter=wait_partner,
|
|
||||||
outputs=[_mood_lonely, _register_token],
|
|
||||||
)
|
|
||||||
wait_relay.upon(
|
|
||||||
please_relay_for_side,
|
|
||||||
enter=wait_partner,
|
|
||||||
outputs=[_mood_lonely, _register_token_for_side],
|
|
||||||
)
|
|
||||||
wait_relay.upon(
|
|
||||||
bad_token,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_errory, _send_bad, _disconnect, _record_usage],
|
|
||||||
)
|
|
||||||
wait_relay.upon(
|
|
||||||
got_bytes,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_count_bytes, _mood_errory, _disconnect, _record_usage],
|
|
||||||
)
|
|
||||||
wait_relay.upon(
|
|
||||||
connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_disconnect, _record_usage],
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_partner.upon(
|
|
||||||
got_partner,
|
|
||||||
enter=relaying,
|
|
||||||
outputs=[_mood_happy, _send_ok, _connect_partner],
|
|
||||||
)
|
|
||||||
wait_partner.upon(
|
|
||||||
connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_lonely, _unregister, _record_usage],
|
|
||||||
)
|
|
||||||
wait_partner.upon(
|
|
||||||
got_bytes,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage],
|
|
||||||
)
|
|
||||||
wait_partner.upon(
|
|
||||||
partner_connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_redundant, _disconnect, _record_usage],
|
|
||||||
)
|
|
||||||
|
|
||||||
relaying.upon(
|
|
||||||
got_bytes,
|
|
||||||
enter=relaying,
|
|
||||||
outputs=[_count_bytes, _send_to_partner],
|
|
||||||
)
|
|
||||||
relaying.upon(
|
|
||||||
connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage],
|
|
||||||
)
|
|
||||||
|
|
||||||
done.upon(
|
|
||||||
connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[],
|
|
||||||
)
|
|
||||||
done.upon(
|
|
||||||
partner_connection_lost,
|
|
||||||
enter=done,
|
|
||||||
outputs=[],
|
|
||||||
)
|
|
||||||
|
|
||||||
# uncomment to turn on state-machine tracing
|
|
||||||
# set_trace_function = _machine._setTrace
|
|
|
@ -5,14 +5,8 @@ from twisted.application.service import MultiService
|
||||||
from twisted.application.internet import (TimerService,
|
from twisted.application.internet import (TimerService,
|
||||||
StreamServerEndpointService)
|
StreamServerEndpointService)
|
||||||
from twisted.internet import endpoints
|
from twisted.internet import endpoints
|
||||||
from twisted.internet import protocol
|
|
||||||
|
|
||||||
from autobahn.twisted.websocket import WebSocketServerFactory
|
|
||||||
|
|
||||||
from . import transit_server
|
from . import transit_server
|
||||||
from .usage import create_usage_tracker
|
|
||||||
from .increase_rlimits import increase_rlimits
|
from .increase_rlimits import increase_rlimits
|
||||||
from .database import get_db
|
|
||||||
|
|
||||||
LONGDESC = """\
|
LONGDESC = """\
|
||||||
This plugin sets up a 'Transit Relay' server for magic-wormhole. This service
|
This plugin sets up a 'Transit Relay' server for magic-wormhole. This service
|
||||||
|
@ -25,9 +19,7 @@ class Options(usage.Options):
|
||||||
longdesc = LONGDESC
|
longdesc = LONGDESC
|
||||||
|
|
||||||
optParameters = [
|
optParameters = [
|
||||||
("port", "p", "tcp:4001:interface=\:\:", "endpoint to listen on"),
|
("port", "p", "tcp:4001", "endpoint to listen on"),
|
||||||
("websocket", "w", None, "endpoint to listen for WebSocket connections"),
|
|
||||||
("websocket-url", "u", None, "WebSocket URL (derived from endpoint if not provided)"),
|
|
||||||
("blur-usage", None, None, "blur timestamps and data sizes in logs"),
|
("blur-usage", None, None, "blur timestamps and data sizes in logs"),
|
||||||
("log-fd", None, None, "write JSON usage logs to this file descriptor"),
|
("log-fd", None, None, "write JSON usage logs to this file descriptor"),
|
||||||
("usage-db", None, None, "record usage data (SQLite)"),
|
("usage-db", None, None, "record usage data (SQLite)"),
|
||||||
|
@ -39,45 +31,14 @@ class Options(usage.Options):
|
||||||
|
|
||||||
def makeService(config, reactor=reactor):
|
def makeService(config, reactor=reactor):
|
||||||
increase_rlimits()
|
increase_rlimits()
|
||||||
tcp_ep = endpoints.serverFromString(reactor, config["port"]) # to listen
|
ep = endpoints.serverFromString(reactor, config["port"]) # to listen
|
||||||
ws_ep = (
|
log_file = (os.fdopen(int(config["log-fd"]), "w")
|
||||||
endpoints.serverFromString(reactor, config["websocket"])
|
if config["log-fd"] is not None
|
||||||
if config["websocket"] is not None
|
else None)
|
||||||
else None
|
f = transit_server.Transit(blur_usage=config["blur-usage"],
|
||||||
)
|
log_file=log_file,
|
||||||
log_file = (
|
usage_db=config["usage-db"])
|
||||||
os.fdopen(int(config["log-fd"]), "w")
|
|
||||||
if config["log-fd"] is not None
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
db = None if config["usage-db"] is None else get_db(config["usage-db"])
|
|
||||||
usage = create_usage_tracker(
|
|
||||||
blur_usage=config["blur-usage"],
|
|
||||||
log_file=log_file,
|
|
||||||
usage_db=db,
|
|
||||||
)
|
|
||||||
transit = transit_server.Transit(usage, reactor.seconds)
|
|
||||||
tcp_factory = protocol.ServerFactory()
|
|
||||||
tcp_factory.protocol = transit_server.TransitConnection
|
|
||||||
tcp_factory.log_requests = False
|
|
||||||
|
|
||||||
if ws_ep is not None:
|
|
||||||
ws_url = config["websocket-url"]
|
|
||||||
if ws_url is None:
|
|
||||||
# we're using a "private" attribute here but I don't see
|
|
||||||
# any useful alternative unless we also want to parse
|
|
||||||
# Twisted endpoint-strings.
|
|
||||||
ws_url = "ws://localhost:{}/".format(ws_ep._port)
|
|
||||||
print("Using WebSocket URL '{}'".format(ws_url))
|
|
||||||
ws_factory = WebSocketServerFactory(ws_url)
|
|
||||||
ws_factory.protocol = transit_server.WebSocketTransitConnection
|
|
||||||
ws_factory.transit = transit
|
|
||||||
ws_factory.log_requests = False
|
|
||||||
|
|
||||||
tcp_factory.transit = transit
|
|
||||||
parent = MultiService()
|
parent = MultiService()
|
||||||
StreamServerEndpointService(tcp_ep, tcp_factory).setServiceParent(parent)
|
StreamServerEndpointService(ep, f).setServiceParent(parent)
|
||||||
if ws_ep is not None:
|
TimerService(5*60.0, f.timerUpdateStats).setServiceParent(parent)
|
||||||
StreamServerEndpointService(ws_ep, ws_factory).setServiceParent(parent)
|
|
||||||
TimerService(5*60.0, transit.update_stats).setServiceParent(parent)
|
|
||||||
return parent
|
return parent
|
||||||
|
|
|
@ -1,144 +1,30 @@
|
||||||
from twisted.internet.protocol import (
|
#from __future__ import unicode_literals
|
||||||
ClientFactory,
|
from twisted.internet import reactor, endpoints
|
||||||
Protocol,
|
from twisted.internet.defer import inlineCallbacks
|
||||||
)
|
from ..transit_server import Transit
|
||||||
from twisted.test import iosim
|
|
||||||
from zope.interface import (
|
|
||||||
Interface,
|
|
||||||
Attribute,
|
|
||||||
implementer,
|
|
||||||
)
|
|
||||||
from ..transit_server import (
|
|
||||||
Transit,
|
|
||||||
TransitConnection,
|
|
||||||
)
|
|
||||||
from twisted.internet.protocol import ServerFactory
|
|
||||||
from ..usage import create_usage_tracker
|
|
||||||
|
|
||||||
|
|
||||||
class IRelayTestClient(Interface):
|
|
||||||
"""
|
|
||||||
The client interface used by tests.
|
|
||||||
"""
|
|
||||||
|
|
||||||
connected = Attribute("True if we are currently connected else False")
|
|
||||||
|
|
||||||
def send(data):
|
|
||||||
"""
|
|
||||||
Send some bytes.
|
|
||||||
:param bytes data: the data to send
|
|
||||||
"""
|
|
||||||
|
|
||||||
def disconnect():
|
|
||||||
"""
|
|
||||||
Terminate the connection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def get_received_data():
|
|
||||||
"""
|
|
||||||
:returns: all the bytes received from the server on this
|
|
||||||
connection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def reset_data():
|
|
||||||
"""
|
|
||||||
Erase any received data to this point.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class ServerBase:
|
class ServerBase:
|
||||||
log_requests = False
|
log_requests = False
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self._pumps = []
|
|
||||||
self._lp = None
|
self._lp = None
|
||||||
if self.log_requests:
|
if self.log_requests:
|
||||||
blur_usage = None
|
blur_usage = None
|
||||||
else:
|
else:
|
||||||
blur_usage = 60.0
|
blur_usage = 60.0
|
||||||
self._setup_relay(blur_usage=blur_usage)
|
yield self._setup_relay(blur_usage=blur_usage)
|
||||||
|
self._transit_server._debug_log = self.log_requests
|
||||||
def flush(self):
|
|
||||||
did_work = False
|
|
||||||
for pump in self._pumps:
|
|
||||||
did_work = pump.flush() or did_work
|
|
||||||
if did_work:
|
|
||||||
self.flush()
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None):
|
def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None):
|
||||||
usage = create_usage_tracker(
|
ep = endpoints.TCP4ServerEndpoint(reactor, 0, interface="127.0.0.1")
|
||||||
blur_usage=blur_usage,
|
self._transit_server = Transit(blur_usage=blur_usage,
|
||||||
log_file=log_file,
|
log_file=log_file, usage_db=usage_db)
|
||||||
usage_db=usage_db,
|
self._lp = yield ep.listen(self._transit_server)
|
||||||
)
|
addr = self._lp.getHost()
|
||||||
self._transit_server = Transit(usage, lambda: 123456789.0)
|
# ws://127.0.0.1:%d/wormhole-relay/ws
|
||||||
|
self.transit = u"tcp:127.0.0.1:%d" % addr.port
|
||||||
def new_protocol(self):
|
|
||||||
"""
|
|
||||||
This should be overridden by derived test-case classes to decide
|
|
||||||
if they want a TCP or WebSockets protocol.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def new_protocol_tcp(self):
|
|
||||||
"""
|
|
||||||
Create a new client protocol connected to the server.
|
|
||||||
:returns: a IRelayTestClient implementation
|
|
||||||
"""
|
|
||||||
server_factory = ServerFactory()
|
|
||||||
server_factory.protocol = TransitConnection
|
|
||||||
server_factory.transit = self._transit_server
|
|
||||||
server_factory.log_requests = self.log_requests
|
|
||||||
server_protocol = server_factory.buildProtocol(('127.0.0.1', 0))
|
|
||||||
|
|
||||||
@implementer(IRelayTestClient)
|
|
||||||
class TransitClientProtocolTcp(Protocol):
|
|
||||||
"""
|
|
||||||
Speak the transit client protocol used by the tests over TCP
|
|
||||||
"""
|
|
||||||
_received = b""
|
|
||||||
connected = False
|
|
||||||
|
|
||||||
# override Protocol callbacks
|
|
||||||
|
|
||||||
def connectionMade(self):
|
|
||||||
self.connected = True
|
|
||||||
return Protocol.connectionMade(self)
|
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
|
||||||
self.connected = False
|
|
||||||
return Protocol.connectionLost(self, reason)
|
|
||||||
|
|
||||||
def dataReceived(self, data):
|
|
||||||
self._received = self._received + data
|
|
||||||
|
|
||||||
# IRelayTestClient
|
|
||||||
|
|
||||||
def send(self, data):
|
|
||||||
self.transport.write(data)
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
self.transport.loseConnection()
|
|
||||||
|
|
||||||
def reset_received_data(self):
|
|
||||||
self._received = b""
|
|
||||||
|
|
||||||
def get_received_data(self):
|
|
||||||
return self._received
|
|
||||||
|
|
||||||
client_factory = ClientFactory()
|
|
||||||
client_factory.protocol = TransitClientProtocolTcp
|
|
||||||
client_protocol = client_factory.buildProtocol(('127.0.0.1', 31337))
|
|
||||||
|
|
||||||
pump = iosim.connect(
|
|
||||||
server_protocol,
|
|
||||||
iosim.makeFakeServer(server_protocol),
|
|
||||||
client_protocol,
|
|
||||||
iosim.makeFakeClient(client_protocol),
|
|
||||||
)
|
|
||||||
pump.flush()
|
|
||||||
self._pumps.append(pump)
|
|
||||||
return client_protocol
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
if self._lp:
|
if self._lp:
|
||||||
|
|
|
@ -1,208 +0,0 @@
|
||||||
from io import (
|
|
||||||
StringIO,
|
|
||||||
)
|
|
||||||
import sys
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
from twisted.trial import unittest
|
|
||||||
from twisted.internet.interfaces import (
|
|
||||||
IPullProducer,
|
|
||||||
)
|
|
||||||
from twisted.internet.protocol import (
|
|
||||||
ProcessProtocol,
|
|
||||||
)
|
|
||||||
from twisted.internet.defer import (
|
|
||||||
inlineCallbacks,
|
|
||||||
Deferred,
|
|
||||||
)
|
|
||||||
from autobahn.twisted.websocket import (
|
|
||||||
WebSocketClientProtocol,
|
|
||||||
create_client_agent,
|
|
||||||
)
|
|
||||||
from zope.interface import implementer
|
|
||||||
|
|
||||||
|
|
||||||
class _CollectOutputProtocol(ProcessProtocol):
|
|
||||||
"""
|
|
||||||
Internal helper. Collects all output (stdout + stderr) into
|
|
||||||
self.output, and callback's on done with all of it after the
|
|
||||||
process exits (for any reason).
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
self.done = Deferred()
|
|
||||||
self.running = Deferred()
|
|
||||||
self.output = StringIO()
|
|
||||||
|
|
||||||
def processEnded(self, reason):
|
|
||||||
if not self.done.called:
|
|
||||||
self.done.callback(self.output.getvalue())
|
|
||||||
|
|
||||||
def outReceived(self, data):
|
|
||||||
print(data.decode(), end="", flush=True)
|
|
||||||
self.output.write(data.decode(sys.getfilesystemencoding()))
|
|
||||||
if not self.running.called:
|
|
||||||
if "on 8088" in self.output.getvalue():
|
|
||||||
self.running.callback(None)
|
|
||||||
|
|
||||||
def errReceived(self, data):
|
|
||||||
print("ERR: {}".format(data.decode(sys.getfilesystemencoding())))
|
|
||||||
self.output.write(data.decode(sys.getfilesystemencoding()))
|
|
||||||
|
|
||||||
|
|
||||||
def run_transit(reactor, proto, tcp_port=None, websocket_port=None):
|
|
||||||
exe = shutil.which("twistd")
|
|
||||||
args = [
|
|
||||||
exe, "-n", "transitrelay",
|
|
||||||
]
|
|
||||||
if tcp_port is not None:
|
|
||||||
args.append("--port")
|
|
||||||
args.append(tcp_port)
|
|
||||||
if websocket_port is not None:
|
|
||||||
args.append("--websocket")
|
|
||||||
args.append(websocket_port)
|
|
||||||
proc = reactor.spawnProcess(proto, exe, args)
|
|
||||||
return proc
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Sender(WebSocketClientProtocol):
|
|
||||||
"""
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kw):
|
|
||||||
WebSocketClientProtocol.__init__(self, *args, **kw)
|
|
||||||
self.done = Deferred()
|
|
||||||
self.got_ok = Deferred()
|
|
||||||
|
|
||||||
def onMessage(self, payload, is_binary):
|
|
||||||
print("onMessage")
|
|
||||||
if not self.got_ok.called:
|
|
||||||
if payload == b"ok\n":
|
|
||||||
self.got_ok.callback(None)
|
|
||||||
print("send: {}".format(payload.decode("utf8")))
|
|
||||||
|
|
||||||
def onClose(self, clean, code, reason):
|
|
||||||
print(f"close: {clean} {code} {reason}")
|
|
||||||
self.done.callback(None)
|
|
||||||
|
|
||||||
|
|
||||||
class Receiver(WebSocketClientProtocol):
|
|
||||||
"""
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kw):
|
|
||||||
WebSocketClientProtocol.__init__(self, *args, **kw)
|
|
||||||
self.done = Deferred()
|
|
||||||
self.first_message = Deferred()
|
|
||||||
self.received = 0
|
|
||||||
|
|
||||||
def onMessage(self, payload, is_binary):
|
|
||||||
print("recv: {}".format(len(payload)))
|
|
||||||
self.received += len(payload)
|
|
||||||
if not self.first_message.called:
|
|
||||||
self.first_message.callback(None)
|
|
||||||
|
|
||||||
def onClose(self, clean, code, reason):
|
|
||||||
print(f"close: {clean} {code} {reason}")
|
|
||||||
self.done.callback(None)
|
|
||||||
|
|
||||||
|
|
||||||
class TransitWebSockets(unittest.TestCase):
|
|
||||||
"""
|
|
||||||
Integration-style tests of the transit WebSocket relay, using the
|
|
||||||
real reactor (and running transit as a subprocess).
|
|
||||||
"""
|
|
||||||
|
|
||||||
@inlineCallbacks
|
|
||||||
def test_buffer_fills(self):
|
|
||||||
"""
|
|
||||||
A running transit relay stops accepting incoming data at a
|
|
||||||
reasonable amount if the peer isn't reading. This test defines
|
|
||||||
that as 'less than 100MiB' although in practice Twisted seems
|
|
||||||
to stop before 10MiB.
|
|
||||||
"""
|
|
||||||
from twisted.internet import reactor
|
|
||||||
transit_proto = _CollectOutputProtocol()
|
|
||||||
transit_proc = run_transit(reactor, transit_proto, websocket_port="tcp:8088")
|
|
||||||
|
|
||||||
def cleanup_process():
|
|
||||||
transit_proc.signalProcess("HUP")
|
|
||||||
return transit_proto.done
|
|
||||||
self.addCleanup(cleanup_process)
|
|
||||||
|
|
||||||
yield transit_proto.running
|
|
||||||
print("Transit running")
|
|
||||||
|
|
||||||
agent = create_client_agent(reactor)
|
|
||||||
side_a = yield agent.open("ws://localhost:8088", {}, lambda: Sender())
|
|
||||||
side_b = yield agent.open("ws://localhost:8088", {}, lambda: Receiver())
|
|
||||||
|
|
||||||
side_a.sendMessage(b"please relay aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa for side aaaaaaaaaaaaaaaa", True)
|
|
||||||
side_b.sendMessage(b"please relay aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa for side bbbbbbbbbbbbbbbb", True)
|
|
||||||
|
|
||||||
yield side_a.got_ok
|
|
||||||
yield side_b.first_message
|
|
||||||
|
|
||||||
# remove side_b's filedescriptor from the reactor .. this
|
|
||||||
# means it will not read any more data
|
|
||||||
reactor.removeReader(side_b.transport)
|
|
||||||
|
|
||||||
# attempt to send up to 100MiB through side_a .. we should get
|
|
||||||
# backpressure before that works which only manifests itself
|
|
||||||
# as this producer not being asked to produce more
|
|
||||||
max_data = 1024*1024*100 # 100MiB
|
|
||||||
|
|
||||||
@implementer(IPullProducer)
|
|
||||||
class ProduceMessages:
|
|
||||||
def __init__(self, ws, on_produce):
|
|
||||||
self._ws = ws
|
|
||||||
self._sent = 0
|
|
||||||
self._max = max_data
|
|
||||||
self._on_produce = on_produce
|
|
||||||
|
|
||||||
def resumeProducing(self):
|
|
||||||
self._on_produce()
|
|
||||||
if self._sent >= self._max:
|
|
||||||
self._ws.sendClose()
|
|
||||||
return
|
|
||||||
data = b"a" * 1024*1024
|
|
||||||
self._ws.sendMessage(data, True)
|
|
||||||
self._sent += len(data)
|
|
||||||
print("sent {}, total {}".format(len(data), self._sent))
|
|
||||||
|
|
||||||
# our only signal is, "did our producer get asked to produce
|
|
||||||
# more data" which it should do periodically. We want to stop
|
|
||||||
# if we haven't seen a new data request for a while -- defined
|
|
||||||
# as "more than 5 seconds".
|
|
||||||
|
|
||||||
done = Deferred()
|
|
||||||
last_produce = None
|
|
||||||
timeout = 2 # seconds
|
|
||||||
|
|
||||||
def asked_for_data():
|
|
||||||
nonlocal last_produce
|
|
||||||
last_produce = reactor.seconds()
|
|
||||||
|
|
||||||
data = ProduceMessages(side_a, asked_for_data)
|
|
||||||
side_a.transport.registerProducer(data, False)
|
|
||||||
data.resumeProducing()
|
|
||||||
|
|
||||||
def check_if_done():
|
|
||||||
if last_produce is not None:
|
|
||||||
if reactor.seconds() - last_produce > timeout:
|
|
||||||
done.callback(None)
|
|
||||||
return
|
|
||||||
# recursive call to ourselves to check again soon
|
|
||||||
reactor.callLater(.1, check_if_done)
|
|
||||||
check_if_done()
|
|
||||||
|
|
||||||
yield done
|
|
||||||
|
|
||||||
mib = 1024*1024.0
|
|
||||||
print("Sent {}MiB of {}MiB before backpressure".format(data._sent / mib, max_data / mib))
|
|
||||||
self.assertTrue(data._sent < max_data, "Too much data sent")
|
|
||||||
|
|
||||||
side_a.sendClose()
|
|
||||||
side_b.sendClose()
|
|
||||||
yield side_a.done
|
|
||||||
yield side_b.done
|
|
|
@ -1,36 +1,18 @@
|
||||||
|
from __future__ import unicode_literals, print_function
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from .. import server_tap
|
from .. import server_tap
|
||||||
|
|
||||||
PORT = "tcp:4001:interface=\:\:"
|
|
||||||
|
|
||||||
class Config(unittest.TestCase):
|
class Config(unittest.TestCase):
|
||||||
def test_defaults(self):
|
def test_defaults(self):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
o.parseOptions([])
|
o.parseOptions([])
|
||||||
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
|
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
|
||||||
"usage-db": None, "port": PORT,
|
"usage-db": None, "port": "tcp:4001"})
|
||||||
"websocket": None, "websocket-url": None})
|
|
||||||
def test_blur(self):
|
def test_blur(self):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
o.parseOptions(["--blur-usage=60"])
|
o.parseOptions(["--blur-usage=60"])
|
||||||
self.assertEqual(o, {"blur-usage": 60, "log-fd": None,
|
self.assertEqual(o, {"blur-usage": 60, "log-fd": None,
|
||||||
"usage-db": None, "port": PORT,
|
"usage-db": None, "port": "tcp:4001"})
|
||||||
"websocket": None, "websocket-url": None})
|
|
||||||
|
|
||||||
def test_websocket(self):
|
|
||||||
o = server_tap.Options()
|
|
||||||
o.parseOptions(["--websocket=tcp:4004"])
|
|
||||||
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
|
|
||||||
"usage-db": None, "port": PORT,
|
|
||||||
"websocket": "tcp:4004", "websocket-url": None})
|
|
||||||
|
|
||||||
def test_websocket_url(self):
|
|
||||||
o = server_tap.Options()
|
|
||||||
o.parseOptions(["--websocket=tcp:4004", "--websocket-url=ws://example.com/"])
|
|
||||||
self.assertEqual(o, {"blur-usage": None, "log-fd": None,
|
|
||||||
"usage-db": None, "port": PORT,
|
|
||||||
"websocket": "tcp:4004",
|
|
||||||
"websocket-url": "ws://example.com/"})
|
|
||||||
|
|
||||||
def test_string(self):
|
def test_string(self):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
from __future__ import print_function, unicode_literals
|
||||||
import os
|
import os
|
||||||
from twisted.python import filepath
|
from twisted.python import filepath
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from unittest import mock
|
from __future__ import print_function, unicode_literals
|
||||||
|
import mock
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from ..increase_rlimits import increase_rlimits
|
from ..increase_rlimits import increase_rlimits
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
|
from __future__ import unicode_literals, print_function
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from unittest import mock
|
import mock
|
||||||
from twisted.application.service import MultiService
|
from twisted.application.service import MultiService
|
||||||
from autobahn.twisted.websocket import WebSocketServerFactory
|
|
||||||
from .. import server_tap
|
from .. import server_tap
|
||||||
|
|
||||||
class Service(unittest.TestCase):
|
class Service(unittest.TestCase):
|
||||||
def test_defaults(self):
|
def test_defaults(self):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
o.parseOptions([])
|
o.parseOptions([])
|
||||||
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t:
|
with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
|
||||||
s = server_tap.makeService(o)
|
s = server_tap.makeService(o)
|
||||||
self.assertEqual(t.mock_calls,
|
self.assertEqual(t.mock_calls,
|
||||||
[mock.call(blur_usage=None,
|
[mock.call(blur_usage=None,
|
||||||
|
@ -18,7 +18,7 @@ class Service(unittest.TestCase):
|
||||||
def test_blur(self):
|
def test_blur(self):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
o.parseOptions(["--blur-usage=60"])
|
o.parseOptions(["--blur-usage=60"])
|
||||||
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t:
|
with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
|
||||||
server_tap.makeService(o)
|
server_tap.makeService(o)
|
||||||
self.assertEqual(t.mock_calls,
|
self.assertEqual(t.mock_calls,
|
||||||
[mock.call(blur_usage=60,
|
[mock.call(blur_usage=60,
|
||||||
|
@ -28,7 +28,7 @@ class Service(unittest.TestCase):
|
||||||
o = server_tap.Options()
|
o = server_tap.Options()
|
||||||
o.parseOptions(["--log-fd=99"])
|
o.parseOptions(["--log-fd=99"])
|
||||||
fd = object()
|
fd = object()
|
||||||
with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t:
|
with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t:
|
||||||
with mock.patch("wormhole_transit_relay.server_tap.os.fdopen",
|
with mock.patch("wormhole_transit_relay.server_tap.os.fdopen",
|
||||||
return_value=fd) as f:
|
return_value=fd) as f:
|
||||||
server_tap.makeService(o)
|
server_tap.makeService(o)
|
||||||
|
@ -37,34 +37,3 @@ class Service(unittest.TestCase):
|
||||||
[mock.call(blur_usage=None,
|
[mock.call(blur_usage=None,
|
||||||
log_file=fd, usage_db=None)])
|
log_file=fd, usage_db=None)])
|
||||||
|
|
||||||
def test_websocket(self):
|
|
||||||
"""
|
|
||||||
A websocket factory is created when passing --websocket
|
|
||||||
"""
|
|
||||||
o = server_tap.Options()
|
|
||||||
o.parseOptions(["--websocket=tcp:4004"])
|
|
||||||
services = server_tap.makeService(o)
|
|
||||||
self.assertTrue(
|
|
||||||
any(
|
|
||||||
isinstance(s.factory, WebSocketServerFactory)
|
|
||||||
for s in services.services
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_websocket_explicit_url(self):
|
|
||||||
"""
|
|
||||||
A websocket factory is created with --websocket and
|
|
||||||
--websocket-url
|
|
||||||
"""
|
|
||||||
o = server_tap.Options()
|
|
||||||
o.parseOptions([
|
|
||||||
"--websocket=tcp:4004",
|
|
||||||
"--websocket-url=ws://example.com:4004",
|
|
||||||
])
|
|
||||||
services = server_tap.makeService(o)
|
|
||||||
self.assertTrue(
|
|
||||||
any(
|
|
||||||
isinstance(s.factory, WebSocketServerFactory)
|
|
||||||
for s in services.services
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
|
@ -1,38 +1,28 @@
|
||||||
import os, io, json
|
from __future__ import print_function, unicode_literals
|
||||||
from unittest import mock
|
import os, io, json, sqlite3
|
||||||
|
import mock
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from ..transit_server import Transit
|
from ..transit_server import Transit
|
||||||
from ..usage import create_usage_tracker
|
|
||||||
from .. import database
|
from .. import database
|
||||||
|
|
||||||
class DB(unittest.TestCase):
|
class DB(unittest.TestCase):
|
||||||
|
def open_db(self, dbfile):
|
||||||
|
db = sqlite3.connect(dbfile)
|
||||||
|
database._initialize_db_connection(db)
|
||||||
|
return db
|
||||||
|
|
||||||
def test_db(self):
|
def test_db(self):
|
||||||
|
|
||||||
T = 1519075308.0
|
T = 1519075308.0
|
||||||
|
|
||||||
class Timer:
|
|
||||||
t = T
|
|
||||||
def __call__(self):
|
|
||||||
return self.t
|
|
||||||
get_time = Timer()
|
|
||||||
|
|
||||||
d = self.mktemp()
|
d = self.mktemp()
|
||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
usage_db = os.path.join(d, "usage.sqlite")
|
usage_db = os.path.join(d, "usage.sqlite")
|
||||||
db = database.get_db(usage_db)
|
with mock.patch("time.time", return_value=T+0):
|
||||||
t = Transit(
|
t = Transit(blur_usage=None, log_file=None, usage_db=usage_db)
|
||||||
create_usage_tracker(blur_usage=None, log_file=None, usage_db=db),
|
db = self.open_db(usage_db)
|
||||||
get_time,
|
|
||||||
)
|
|
||||||
self.assertEqual(len(t.usage._backends), 1)
|
|
||||||
usage = list(t.usage._backends)[0]
|
|
||||||
|
|
||||||
get_time.t = T + 1
|
|
||||||
usage.record_usage(started=123, mood="happy", total_bytes=100,
|
|
||||||
total_time=10, waiting_time=2)
|
|
||||||
t.update_stats()
|
|
||||||
|
|
||||||
|
with mock.patch("time.time", return_value=T+1):
|
||||||
|
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||||
|
total_time=10, waiting_time=2)
|
||||||
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
||||||
[dict(result="happy", started=123,
|
[dict(result="happy", started=123,
|
||||||
total_bytes=100, total_time=10, waiting_time=2),
|
total_bytes=100, total_time=10, waiting_time=2),
|
||||||
|
@ -42,10 +32,9 @@ class DB(unittest.TestCase):
|
||||||
incomplete_bytes=0,
|
incomplete_bytes=0,
|
||||||
waiting=0, connected=0))
|
waiting=0, connected=0))
|
||||||
|
|
||||||
get_time.t = T + 2
|
with mock.patch("time.time", return_value=T+2):
|
||||||
usage.record_usage(started=150, mood="errory", total_bytes=200,
|
t.recordUsage(started=150, result="errory", total_bytes=200,
|
||||||
total_time=11, waiting_time=3)
|
total_time=11, waiting_time=3)
|
||||||
t.update_stats()
|
|
||||||
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
||||||
[dict(result="happy", started=123,
|
[dict(result="happy", started=123,
|
||||||
total_bytes=100, total_time=10, waiting_time=2),
|
total_bytes=100, total_time=10, waiting_time=2),
|
||||||
|
@ -57,37 +46,27 @@ class DB(unittest.TestCase):
|
||||||
incomplete_bytes=0,
|
incomplete_bytes=0,
|
||||||
waiting=0, connected=0))
|
waiting=0, connected=0))
|
||||||
|
|
||||||
get_time.t = T + 3
|
with mock.patch("time.time", return_value=T+3):
|
||||||
t.update_stats()
|
t.timerUpdateStats()
|
||||||
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
|
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
|
||||||
dict(rebooted=T+0, updated=T+3,
|
dict(rebooted=T+0, updated=T+3,
|
||||||
incomplete_bytes=0,
|
incomplete_bytes=0,
|
||||||
waiting=0, connected=0))
|
waiting=0, connected=0))
|
||||||
|
|
||||||
def test_no_db(self):
|
def test_no_db(self):
|
||||||
t = Transit(
|
t = Transit(blur_usage=None, log_file=None, usage_db=None)
|
||||||
create_usage_tracker(blur_usage=None, log_file=None, usage_db=None),
|
|
||||||
lambda: 0,
|
|
||||||
)
|
|
||||||
self.assertEqual(0, len(t.usage._backends))
|
|
||||||
|
|
||||||
|
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||||
|
total_time=10, waiting_time=2)
|
||||||
|
t.timerUpdateStats()
|
||||||
|
|
||||||
class LogToStdout(unittest.TestCase):
|
class LogToStdout(unittest.TestCase):
|
||||||
def test_log(self):
|
def test_log(self):
|
||||||
# emit lines of JSON to log_file, if set
|
# emit lines of JSON to log_file, if set
|
||||||
log_file = io.StringIO()
|
log_file = io.StringIO()
|
||||||
t = Transit(
|
t = Transit(blur_usage=None, log_file=log_file, usage_db=None)
|
||||||
create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None),
|
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||||
lambda: 0,
|
total_time=10, waiting_time=2)
|
||||||
)
|
|
||||||
with mock.patch("time.time", return_value=133):
|
|
||||||
t.usage.record(
|
|
||||||
started=123,
|
|
||||||
buddy_started=125,
|
|
||||||
result="happy",
|
|
||||||
bytes_sent=100,
|
|
||||||
buddy_bytes=0,
|
|
||||||
)
|
|
||||||
self.assertEqual(json.loads(log_file.getvalue()),
|
self.assertEqual(json.loads(log_file.getvalue()),
|
||||||
{"started": 123, "total_time": 10,
|
{"started": 123, "total_time": 10,
|
||||||
"waiting_time": 2, "total_bytes": 100,
|
"waiting_time": 2, "total_bytes": 100,
|
||||||
|
@ -97,34 +76,15 @@ class LogToStdout(unittest.TestCase):
|
||||||
# if blurring is enabled, timestamps should be rounded to the
|
# if blurring is enabled, timestamps should be rounded to the
|
||||||
# requested amount, and sizes should be rounded up too
|
# requested amount, and sizes should be rounded up too
|
||||||
log_file = io.StringIO()
|
log_file = io.StringIO()
|
||||||
t = Transit(
|
t = Transit(blur_usage=60, log_file=log_file, usage_db=None)
|
||||||
create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None),
|
t.recordUsage(started=123, result="happy", total_bytes=11999,
|
||||||
lambda: 0,
|
total_time=10, waiting_time=2)
|
||||||
)
|
|
||||||
|
|
||||||
with mock.patch("time.time", return_value=123 + 10):
|
|
||||||
t.usage.record(
|
|
||||||
started=123,
|
|
||||||
buddy_started=125,
|
|
||||||
result="happy",
|
|
||||||
bytes_sent=11999,
|
|
||||||
buddy_bytes=0,
|
|
||||||
)
|
|
||||||
print(log_file.getvalue())
|
|
||||||
self.assertEqual(json.loads(log_file.getvalue()),
|
self.assertEqual(json.loads(log_file.getvalue()),
|
||||||
{"started": 120, "total_time": 10,
|
{"started": 120, "total_time": 10,
|
||||||
"waiting_time": 2, "total_bytes": 20000,
|
"waiting_time": 2, "total_bytes": 20000,
|
||||||
"mood": "happy"})
|
"mood": "happy"})
|
||||||
|
|
||||||
def test_do_not_log(self):
|
def test_do_not_log(self):
|
||||||
t = Transit(
|
t = Transit(blur_usage=60, log_file=None, usage_db=None)
|
||||||
create_usage_tracker(blur_usage=60, log_file=None, usage_db=None),
|
t.recordUsage(started=123, result="happy", total_bytes=11999,
|
||||||
lambda: 0,
|
total_time=10, waiting_time=2)
|
||||||
)
|
|
||||||
t.usage.record(
|
|
||||||
started=123,
|
|
||||||
buddy_started=124,
|
|
||||||
result="happy",
|
|
||||||
bytes_sent=11999,
|
|
||||||
buddy_bytes=12,
|
|
||||||
)
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,9 +1,9 @@
|
||||||
import re
|
from __future__ import print_function, unicode_literals
|
||||||
import time
|
import re, time, json
|
||||||
|
from collections import defaultdict
|
||||||
from twisted.python import log
|
from twisted.python import log
|
||||||
from twisted.protocols.basic import LineReceiver
|
from twisted.internet import protocol
|
||||||
from autobahn.twisted.websocket import WebSocketServerProtocol
|
from .database import get_db
|
||||||
|
|
||||||
|
|
||||||
SECONDS = 1.0
|
SECONDS = 1.0
|
||||||
MINUTE = 60*SECONDS
|
MINUTE = 60*SECONDS
|
||||||
|
@ -11,256 +11,366 @@ HOUR = 60*MINUTE
|
||||||
DAY = 24*HOUR
|
DAY = 24*HOUR
|
||||||
MB = 1000*1000
|
MB = 1000*1000
|
||||||
|
|
||||||
|
def round_to(size, coarseness):
|
||||||
|
return int(coarseness*(1+int((size-1)/coarseness)))
|
||||||
|
|
||||||
from wormhole_transit_relay.server_state import (
|
def blur_size(size):
|
||||||
TransitServerState,
|
if size == 0:
|
||||||
PendingRequests,
|
return 0
|
||||||
ActiveConnections,
|
if size < 1e6:
|
||||||
ITransitClient,
|
return round_to(size, 10e3)
|
||||||
)
|
if size < 1e9:
|
||||||
from zope.interface import implementer
|
return round_to(size, 1e6)
|
||||||
|
return round_to(size, 100e6)
|
||||||
|
|
||||||
|
class TransitConnection(protocol.Protocol):
|
||||||
@implementer(ITransitClient)
|
def __init__(self):
|
||||||
class TransitConnection(LineReceiver):
|
self._got_token = False
|
||||||
delimiter = b'\n'
|
self._got_side = False
|
||||||
# maximum length of a line we will accept before the handshake is complete.
|
self._token_buffer = b""
|
||||||
# This must be >= to the longest possible handshake message.
|
self._sent_ok = False
|
||||||
|
self._mood = None
|
||||||
MAX_LENGTH = 1024
|
|
||||||
started_time = None
|
|
||||||
|
|
||||||
def send(self, data):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self.transport.write(data)
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self.transport.loseConnection()
|
|
||||||
|
|
||||||
def connect_partner(self, other):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self._buddy = other
|
|
||||||
self._buddy._client.transport.registerProducer(self.transport, True)
|
|
||||||
|
|
||||||
def disconnect_partner(self):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
assert self._buddy is not None, "internal error: no buddy"
|
|
||||||
if self.factory.log_requests:
|
|
||||||
log.msg("buddy_disconnected {}".format(self._buddy.get_token()))
|
|
||||||
self._buddy._client.disconnect()
|
|
||||||
self._buddy = None
|
self._buddy = None
|
||||||
|
self._total_sent = 0
|
||||||
|
|
||||||
|
def describeToken(self):
|
||||||
|
d = "-"
|
||||||
|
if self._got_token:
|
||||||
|
d = self._got_token[:16].decode("ascii")
|
||||||
|
if self._got_side:
|
||||||
|
d += "-" + self._got_side.decode("ascii")
|
||||||
|
else:
|
||||||
|
d += "-<unsided>"
|
||||||
|
return d
|
||||||
|
|
||||||
def connectionMade(self):
|
def connectionMade(self):
|
||||||
# ideally more like self._reactor.seconds() ... but Twisted
|
self._started = time.time()
|
||||||
# doesn't have a good way to get the reactor for a protocol
|
self._log_requests = self.factory._log_requests
|
||||||
# (besides "use the global one")
|
|
||||||
self.started_time = time.time()
|
|
||||||
self._state = TransitServerState(
|
|
||||||
self.factory.transit.pending_requests,
|
|
||||||
self.factory.transit.usage,
|
|
||||||
)
|
|
||||||
self._state.connection_made(self)
|
|
||||||
self.transport.setTcpKeepAlive(True)
|
|
||||||
|
|
||||||
# uncomment to turn on state-machine tracing
|
def dataReceived(self, data):
|
||||||
# def tracer(oldstate, theinput, newstate):
|
if self._sent_ok:
|
||||||
# print("TRACE: {}: {} --{}--> {}".format(id(self), oldstate, theinput, newstate))
|
# We are an IPushProducer to our buddy's IConsumer, so they'll
|
||||||
# self._state.set_trace_function(tracer)
|
# throttle us (by calling pauseProducing()) when their outbound
|
||||||
|
# buffer is full (e.g. when their downstream pipe is full). In
|
||||||
|
# practice, this buffers about 10MB per connection, after which
|
||||||
|
# point the sender will only transmit data as fast as the
|
||||||
|
# receiver can handle it.
|
||||||
|
self._total_sent += len(data)
|
||||||
|
self._buddy.transport.write(data)
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._got_token: # but not yet sent_ok
|
||||||
|
self.transport.write(b"impatient\n")
|
||||||
|
if self._log_requests:
|
||||||
|
log.msg("transit impatience failure")
|
||||||
|
return self.disconnect_error() # impatience yields failure
|
||||||
|
|
||||||
|
# else this should be (part of) the token
|
||||||
|
self._token_buffer += data
|
||||||
|
buf = self._token_buffer
|
||||||
|
|
||||||
def lineReceived(self, line):
|
|
||||||
"""
|
|
||||||
LineReceiver API
|
|
||||||
"""
|
|
||||||
# old: "please relay {64}\n"
|
# old: "please relay {64}\n"
|
||||||
token = None
|
|
||||||
old = re.search(br"^please relay (\w{64})$", line)
|
|
||||||
if old:
|
|
||||||
token = old.group(1)
|
|
||||||
self._state.please_relay(token)
|
|
||||||
|
|
||||||
# new: "please relay {64} for side {16}\n"
|
# new: "please relay {64} for side {16}\n"
|
||||||
new = re.search(br"^please relay (\w{64}) for side (\w{16})$", line)
|
(old, handshake_len, token) = self._check_old_handshake(buf)
|
||||||
if new:
|
assert old in ("yes", "waiting", "no")
|
||||||
token = new.group(1)
|
if old == "yes":
|
||||||
side = new.group(2)
|
# remember they aren't supposed to send anything past their
|
||||||
self._state.please_relay_for_side(token, side)
|
# handshake until we've said go
|
||||||
|
if len(buf) > handshake_len:
|
||||||
|
self.transport.write(b"impatient\n")
|
||||||
|
if self._log_requests:
|
||||||
|
log.msg("transit impatience failure")
|
||||||
|
return self.disconnect_error() # impatience yields failure
|
||||||
|
return self._got_handshake(token, None)
|
||||||
|
(new, handshake_len, token, side) = self._check_new_handshake(buf)
|
||||||
|
assert new in ("yes", "waiting", "no")
|
||||||
|
if new == "yes":
|
||||||
|
if len(buf) > handshake_len:
|
||||||
|
self.transport.write(b"impatient\n")
|
||||||
|
if self._log_requests:
|
||||||
|
log.msg("transit impatience failure")
|
||||||
|
return self.disconnect_error() # impatience yields failure
|
||||||
|
return self._got_handshake(token, side)
|
||||||
|
if (old == "no" and new == "no"):
|
||||||
|
self.transport.write(b"bad handshake\n")
|
||||||
|
if self._log_requests:
|
||||||
|
log.msg("transit handshake failure")
|
||||||
|
return self.disconnect_error() # incorrectness yields failure
|
||||||
|
# else we'll keep waiting
|
||||||
|
|
||||||
if token is None:
|
def _check_old_handshake(self, buf):
|
||||||
self._state.bad_token()
|
# old: "please relay {64}\n"
|
||||||
else:
|
# return ("yes", handshake, token) if buf contains an old-style handshake
|
||||||
self.setRawMode()
|
# return ("waiting", None, None) if it might eventually contain one
|
||||||
|
# return ("no", None, None) if it could never contain one
|
||||||
|
wanted = len("please relay \n")+32*2
|
||||||
|
if len(buf) < wanted-1 and b"\n" in buf:
|
||||||
|
return ("no", None, None)
|
||||||
|
if len(buf) < wanted:
|
||||||
|
return ("waiting", None, None)
|
||||||
|
|
||||||
def rawDataReceived(self, data):
|
mo = re.search(br"^please relay (\w{64})\n", buf, re.M)
|
||||||
"""
|
if mo:
|
||||||
LineReceiver API
|
token = mo.group(1)
|
||||||
"""
|
return ("yes", wanted, token)
|
||||||
# We are an IPushProducer to our buddy's IConsumer, so they'll
|
return ("no", None, None)
|
||||||
# throttle us (by calling pauseProducing()) when their outbound
|
|
||||||
# buffer is full (e.g. when their downstream pipe is full). In
|
def _check_new_handshake(self, buf):
|
||||||
# practice, this buffers about 10MB per connection, after which
|
# new: "please relay {64} for side {16}\n"
|
||||||
# point the sender will only transmit data as fast as the
|
wanted = len("please relay for side \n")+32*2+8*2
|
||||||
# receiver can handle it.
|
if len(buf) < wanted-1 and b"\n" in buf:
|
||||||
self._state.got_bytes(data)
|
return ("no", None, None, None)
|
||||||
|
if len(buf) < wanted:
|
||||||
|
return ("waiting", None, None, None)
|
||||||
|
|
||||||
|
mo = re.search(br"^please relay (\w{64}) for side (\w{16})\n", buf, re.M)
|
||||||
|
if mo:
|
||||||
|
token = mo.group(1)
|
||||||
|
side = mo.group(2)
|
||||||
|
return ("yes", wanted, token, side)
|
||||||
|
return ("no", None, None, None)
|
||||||
|
|
||||||
|
def _got_handshake(self, token, side):
|
||||||
|
self._got_token = token
|
||||||
|
self._got_side = side
|
||||||
|
self._mood = "lonely" # until buddy connects
|
||||||
|
self.factory.connection_got_token(token, side, self)
|
||||||
|
|
||||||
|
def buddy_connected(self, them):
|
||||||
|
self._buddy = them
|
||||||
|
self._mood = "happy"
|
||||||
|
self.transport.write(b"ok\n")
|
||||||
|
self._sent_ok = True
|
||||||
|
# Connect the two as a producer/consumer pair. We use streaming=True,
|
||||||
|
# so this expects the IPushProducer interface, and uses
|
||||||
|
# pauseProducing() to throttle, and resumeProducing() to unthrottle.
|
||||||
|
self._buddy.transport.registerProducer(self.transport, True)
|
||||||
|
# The Transit object calls buddy_connected() on both protocols, so
|
||||||
|
# there will be two producer/consumer pairs.
|
||||||
|
|
||||||
|
def buddy_disconnected(self):
|
||||||
|
if self._log_requests:
|
||||||
|
log.msg("buddy_disconnected %s" % self.describeToken())
|
||||||
|
self._buddy = None
|
||||||
|
self._mood = "jilted"
|
||||||
|
self.transport.loseConnection()
|
||||||
|
|
||||||
|
def disconnect_error(self):
|
||||||
|
# we haven't finished the handshake, so there are no tokens tracking
|
||||||
|
# us
|
||||||
|
self._mood = "errory"
|
||||||
|
self.transport.loseConnection()
|
||||||
|
if self.factory._debug_log:
|
||||||
|
log.msg("transitFailed %r" % self)
|
||||||
|
|
||||||
|
def disconnect_redundant(self):
|
||||||
|
# this is called if a buddy connected and we were found unnecessary.
|
||||||
|
# Any token-tracking cleanup will have been done before we're called.
|
||||||
|
self._mood = "redundant"
|
||||||
|
self.transport.loseConnection()
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
self._state.connection_lost()
|
finished = time.time()
|
||||||
|
total_time = finished - self._started
|
||||||
|
|
||||||
|
# Record usage. There are seven cases:
|
||||||
|
# * n1: the handshake failed, not a real client (errory)
|
||||||
|
# * n2: real client disconnected before any buddy appeared (lonely)
|
||||||
|
# * n3: real client closed as redundant after buddy appears (redundant)
|
||||||
|
# * n4: real client connected first, buddy closes first (jilted)
|
||||||
|
# * n5: real client connected first, buddy close last (happy)
|
||||||
|
# * n6: real client connected last, buddy closes first (jilted)
|
||||||
|
# * n7: real client connected last, buddy closes last (happy)
|
||||||
|
|
||||||
class Transit(object):
|
# * non-connected clients (1,2,3) always write a usage record
|
||||||
"""
|
# * for connected clients, whoever disconnects first gets to write the
|
||||||
I manage pairs of simultaneous connections to a secondary TCP port,
|
# usage record (5, 7). The last disconnect doesn't write a record.
|
||||||
both forwarded to the other. Clients must begin each connection with
|
|
||||||
"please relay TOKEN for SIDE\n" (or a legacy form without the "for
|
|
||||||
SIDE"). Two connections match if they use the same TOKEN and have
|
|
||||||
different SIDEs (the redundant connections are dropped when a match is
|
|
||||||
made). Legacy connections match any with the same TOKEN, ignoring SIDE
|
|
||||||
(so two legacy connections will match each other).
|
|
||||||
|
|
||||||
I will send "ok\n" when the matching connection is established, or
|
if self._mood == "errory": # 1
|
||||||
disconnect if no matching connection is made within MAX_WAIT_TIME
|
assert not self._buddy
|
||||||
seconds. I will disconnect if you send data before the "ok\n". All data
|
self.factory.recordUsage(self._started, "errory", 0,
|
||||||
you get after the "ok\n" will be from the other side. You will not
|
total_time, None)
|
||||||
receive "ok\n" until the other side has also connected and submitted a
|
elif self._mood == "redundant": # 3
|
||||||
matching token (and differing SIDE).
|
assert not self._buddy
|
||||||
|
self.factory.recordUsage(self._started, "redundant", 0,
|
||||||
|
total_time, None)
|
||||||
|
elif self._mood == "jilted": # 4 or 6
|
||||||
|
# we were connected, but our buddy hung up on us. They record the
|
||||||
|
# usage event, we do not
|
||||||
|
pass
|
||||||
|
elif self._mood == "lonely": # 2
|
||||||
|
assert not self._buddy
|
||||||
|
self.factory.recordUsage(self._started, "lonely", 0,
|
||||||
|
total_time, None)
|
||||||
|
else: # 5 or 7
|
||||||
|
# we were connected, we hung up first. We record the event.
|
||||||
|
assert self._mood == "happy", self._mood
|
||||||
|
assert self._buddy
|
||||||
|
starts = [self._started, self._buddy._started]
|
||||||
|
total_time = finished - min(starts)
|
||||||
|
waiting_time = max(starts) - min(starts)
|
||||||
|
total_bytes = self._total_sent + self._buddy._total_sent
|
||||||
|
self.factory.recordUsage(self._started, "happy", total_bytes,
|
||||||
|
total_time, waiting_time)
|
||||||
|
|
||||||
In addition, the connections will be dropped after MAXLENGTH bytes have
|
if self._buddy:
|
||||||
been sent by either side, or MAXTIME seconds have elapsed after the
|
self._buddy.buddy_disconnected()
|
||||||
matching connections were established. A future API will reveal these
|
self.factory.transitFinished(self, self._got_token, self._got_side,
|
||||||
limits to clients instead of causing mysterious spontaneous failures.
|
self.describeToken())
|
||||||
|
|
||||||
These relay connections are not half-closeable (unlike full TCP
|
class Transit(protocol.ServerFactory):
|
||||||
connections, applications will not receive any data after half-closing
|
# I manage pairs of simultaneous connections to a secondary TCP port,
|
||||||
their outgoing side). Applications must negotiate shutdown with their
|
# both forwarded to the other. Clients must begin each connection with
|
||||||
peer and not close the connection until all data has finished
|
# "please relay TOKEN for SIDE\n" (or a legacy form without the "for
|
||||||
transferring in both directions. Applications which only need to send
|
# SIDE"). Two connections match if they use the same TOKEN and have
|
||||||
data in one direction can use close() as usual.
|
# different SIDEs (the redundant connections are dropped when a match is
|
||||||
"""
|
# made). Legacy connections match any with the same TOKEN, ignoring SIDE
|
||||||
|
# (so two legacy connections will match each other).
|
||||||
|
|
||||||
|
# I will send "ok\n" when the matching connection is established, or
|
||||||
|
# disconnect if no matching connection is made within MAX_WAIT_TIME
|
||||||
|
# seconds. I will disconnect if you send data before the "ok\n". All data
|
||||||
|
# you get after the "ok\n" will be from the other side. You will not
|
||||||
|
# receive "ok\n" until the other side has also connected and submitted a
|
||||||
|
# matching token (and differing SIDE).
|
||||||
|
|
||||||
|
# In addition, the connections will be dropped after MAXLENGTH bytes have
|
||||||
|
# been sent by either side, or MAXTIME seconds have elapsed after the
|
||||||
|
# matching connections were established. A future API will reveal these
|
||||||
|
# limits to clients instead of causing mysterious spontaneous failures.
|
||||||
|
|
||||||
|
# These relay connections are not half-closeable (unlike full TCP
|
||||||
|
# connections, applications will not receive any data after half-closing
|
||||||
|
# their outgoing side). Applications must negotiate shutdown with their
|
||||||
|
# peer and not close the connection until all data has finished
|
||||||
|
# transferring in both directions. Applications which only need to send
|
||||||
|
# data in one direction can use close() as usual.
|
||||||
|
|
||||||
# TODO: unused
|
|
||||||
MAX_WAIT_TIME = 30*SECONDS
|
MAX_WAIT_TIME = 30*SECONDS
|
||||||
# TODO: unused
|
|
||||||
MAXLENGTH = 10*MB
|
MAXLENGTH = 10*MB
|
||||||
# TODO: unused
|
|
||||||
MAXTIME = 60*SECONDS
|
MAXTIME = 60*SECONDS
|
||||||
|
protocol = TransitConnection
|
||||||
|
|
||||||
def __init__(self, usage, get_timestamp):
|
def __init__(self, blur_usage, log_file, usage_db):
|
||||||
self.active_connections = ActiveConnections()
|
self._blur_usage = blur_usage
|
||||||
self.pending_requests = PendingRequests(self.active_connections)
|
self._log_requests = blur_usage is None
|
||||||
self.usage = usage
|
if self._blur_usage:
|
||||||
self._timestamp = get_timestamp
|
log.msg("blurring access times to %d seconds" % self._blur_usage)
|
||||||
self._rebooted = self._timestamp()
|
log.msg("not logging Transit connections to Twisted log")
|
||||||
|
else:
|
||||||
|
log.msg("not blurring access times")
|
||||||
|
self._debug_log = False
|
||||||
|
self._log_file = log_file
|
||||||
|
self._db = None
|
||||||
|
if usage_db:
|
||||||
|
self._db = get_db(usage_db)
|
||||||
|
self._rebooted = time.time()
|
||||||
|
# we don't track TransitConnections until they submit a token
|
||||||
|
self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection))
|
||||||
|
self._active_connections = set() # TransitConnection
|
||||||
|
|
||||||
def update_stats(self):
|
def connection_got_token(self, token, new_side, new_tc):
|
||||||
|
potentials = self._pending_requests[token]
|
||||||
|
for old in potentials:
|
||||||
|
(old_side, old_tc) = old
|
||||||
|
if ((old_side is None)
|
||||||
|
or (new_side is None)
|
||||||
|
or (old_side != new_side)):
|
||||||
|
# we found a match
|
||||||
|
if self._debug_log:
|
||||||
|
log.msg("transit relay 2: %s" % new_tc.describeToken())
|
||||||
|
|
||||||
|
# drop and stop tracking the rest
|
||||||
|
potentials.remove(old)
|
||||||
|
for (_, leftover_tc) in potentials:
|
||||||
|
# Don't record this as errory. It's just a spare connection
|
||||||
|
# from the same side as a connection that got used. This
|
||||||
|
# can happen if the connection hint contains multiple
|
||||||
|
# addresses (we don't currently support those, but it'd
|
||||||
|
# probably be useful in the future).
|
||||||
|
leftover_tc.disconnect_redundant()
|
||||||
|
self._pending_requests.pop(token)
|
||||||
|
|
||||||
|
# glue the two ends together
|
||||||
|
self._active_connections.add(new_tc)
|
||||||
|
self._active_connections.add(old_tc)
|
||||||
|
new_tc.buddy_connected(old_tc)
|
||||||
|
old_tc.buddy_connected(new_tc)
|
||||||
|
return
|
||||||
|
if self._debug_log:
|
||||||
|
log.msg("transit relay 1: %s" % new_tc.describeToken())
|
||||||
|
potentials.add((new_side, new_tc))
|
||||||
|
# TODO: timer
|
||||||
|
|
||||||
|
def transitFinished(self, tc, token, side, description):
|
||||||
|
if token in self._pending_requests:
|
||||||
|
side_tc = (side, tc)
|
||||||
|
self._pending_requests[token].discard(side_tc)
|
||||||
|
if not self._pending_requests[token]: # set is now empty
|
||||||
|
del self._pending_requests[token]
|
||||||
|
if self._debug_log:
|
||||||
|
log.msg("transitFinished %s" % (description,))
|
||||||
|
self._active_connections.discard(tc)
|
||||||
|
# we could update the usage database "current" row immediately, or wait
|
||||||
|
# until the 5-minute timer updates it. If we update it now, just after
|
||||||
|
# losing a connection, we should probably also update it just after
|
||||||
|
# establishing one (at the end of connection_got_token). For now I'm
|
||||||
|
# going to omit these, but maybe someday we'll turn them both on. The
|
||||||
|
# consequence is that a manual execution of the munin scripts ("munin
|
||||||
|
# run wormhole_transit_active") will give the wrong value just after a
|
||||||
|
# connect/disconnect event. Actual munin graphs should accurately
|
||||||
|
# report connections that last longer than the 5-minute sampling
|
||||||
|
# window, which is what we actually care about.
|
||||||
|
#self.timerUpdateStats()
|
||||||
|
|
||||||
|
def recordUsage(self, started, result, total_bytes,
|
||||||
|
total_time, waiting_time):
|
||||||
|
if self._debug_log:
|
||||||
|
log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes)
|
||||||
|
if self._blur_usage:
|
||||||
|
started = self._blur_usage * (started // self._blur_usage)
|
||||||
|
total_bytes = blur_size(total_bytes)
|
||||||
|
if self._log_file is not None:
|
||||||
|
data = {"started": started,
|
||||||
|
"total_time": total_time,
|
||||||
|
"waiting_time": waiting_time,
|
||||||
|
"total_bytes": total_bytes,
|
||||||
|
"mood": result,
|
||||||
|
}
|
||||||
|
self._log_file.write(json.dumps(data)+"\n")
|
||||||
|
self._log_file.flush()
|
||||||
|
if self._db:
|
||||||
|
self._db.execute("INSERT INTO `usage`"
|
||||||
|
" (`started`, `total_time`, `waiting_time`,"
|
||||||
|
" `total_bytes`, `result`)"
|
||||||
|
" VALUES (?,?,?, ?,?)",
|
||||||
|
(started, total_time, waiting_time,
|
||||||
|
total_bytes, result))
|
||||||
|
self._update_stats()
|
||||||
|
self._db.commit()
|
||||||
|
|
||||||
|
def timerUpdateStats(self):
|
||||||
|
if self._db:
|
||||||
|
self._update_stats()
|
||||||
|
self._db.commit()
|
||||||
|
|
||||||
|
def _update_stats(self):
|
||||||
|
# current status: should be zero when idle
|
||||||
|
rebooted = self._rebooted
|
||||||
|
updated = time.time()
|
||||||
|
connected = len(self._active_connections) / 2
|
||||||
# TODO: when a connection is half-closed, len(active) will be odd. a
|
# TODO: when a connection is half-closed, len(active) will be odd. a
|
||||||
# moment later (hopefully) the other side will disconnect, but
|
# moment later (hopefully) the other side will disconnect, but
|
||||||
# _update_stats isn't updated until later.
|
# _update_stats isn't updated until later.
|
||||||
|
waiting = len(self._pending_requests)
|
||||||
# "waiting" doesn't count multiple parallel connections from the same
|
# "waiting" doesn't count multiple parallel connections from the same
|
||||||
# side
|
# side
|
||||||
self.usage.update_stats(
|
incomplete_bytes = sum(tc._total_sent
|
||||||
rebooted=self._rebooted,
|
for tc in self._active_connections)
|
||||||
updated=self._timestamp(),
|
self._db.execute("DELETE FROM `current`")
|
||||||
connected=len(self.active_connections._connections),
|
self._db.execute("INSERT INTO `current`"
|
||||||
waiting=len(self.pending_requests._requests),
|
" (`rebooted`, `updated`, `connected`, `waiting`,"
|
||||||
incomplete_bytes=sum(
|
" `incomplete_bytes`)"
|
||||||
tc._total_sent
|
" VALUES (?, ?, ?, ?, ?)",
|
||||||
for tc in self.active_connections._connections
|
(rebooted, updated, connected, waiting,
|
||||||
),
|
incomplete_bytes))
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(ITransitClient)
|
|
||||||
class WebSocketTransitConnection(WebSocketServerProtocol):
|
|
||||||
started_time = None
|
|
||||||
|
|
||||||
def send(self, data):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self.sendMessage(data, isBinary=True)
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self.sendClose(1000, None)
|
|
||||||
|
|
||||||
def connect_partner(self, other):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
self._buddy = other
|
|
||||||
self._buddy._client.transport.registerProducer(self.transport, True)
|
|
||||||
|
|
||||||
def disconnect_partner(self):
|
|
||||||
"""
|
|
||||||
ITransitClient API
|
|
||||||
"""
|
|
||||||
assert self._buddy is not None, "internal error: no buddy"
|
|
||||||
if self.factory.log_requests:
|
|
||||||
log.msg("buddy_disconnected {}".format(self._buddy.get_token()))
|
|
||||||
self._buddy._client.disconnect()
|
|
||||||
self._buddy = None
|
|
||||||
|
|
||||||
def connectionMade(self):
|
|
||||||
"""
|
|
||||||
IProtocol API
|
|
||||||
"""
|
|
||||||
super(WebSocketTransitConnection, self).connectionMade()
|
|
||||||
self.started_time = time.time()
|
|
||||||
self._first_message = True
|
|
||||||
self._state = TransitServerState(
|
|
||||||
self.factory.transit.pending_requests,
|
|
||||||
self.factory.transit.usage,
|
|
||||||
)
|
|
||||||
|
|
||||||
# uncomment to turn on state-machine tracing
|
|
||||||
# def tracer(oldstate, theinput, newstate):
|
|
||||||
# print("WSTRACE: {}: {} --{}--> {}".format(id(self), oldstate, theinput, newstate))
|
|
||||||
# self._state.set_trace_function(tracer)
|
|
||||||
|
|
||||||
def onOpen(self):
|
|
||||||
self._state.connection_made(self)
|
|
||||||
|
|
||||||
def onMessage(self, payload, isBinary):
|
|
||||||
"""
|
|
||||||
We may have a 'handshake' on our hands or we may just have some bytes to relay
|
|
||||||
"""
|
|
||||||
if not isBinary:
|
|
||||||
raise ValueError(
|
|
||||||
"All messages must be binary"
|
|
||||||
)
|
|
||||||
if self._first_message:
|
|
||||||
self._first_message = False
|
|
||||||
token = None
|
|
||||||
old = re.search(br"^please relay (\w{64})$", payload)
|
|
||||||
if old:
|
|
||||||
token = old.group(1)
|
|
||||||
self._state.please_relay(token)
|
|
||||||
|
|
||||||
# new: "please relay {64} for side {16}\n"
|
|
||||||
new = re.search(br"^please relay (\w{64}) for side (\w{16})$", payload)
|
|
||||||
if new:
|
|
||||||
token = new.group(1)
|
|
||||||
side = new.group(2)
|
|
||||||
self._state.please_relay_for_side(token, side)
|
|
||||||
|
|
||||||
if token is None:
|
|
||||||
self._state.bad_token()
|
|
||||||
else:
|
|
||||||
self._state.got_bytes(payload)
|
|
||||||
|
|
||||||
def onClose(self, wasClean, code, reason):
|
|
||||||
"""
|
|
||||||
IWebSocketChannel API
|
|
||||||
"""
|
|
||||||
self._state.connection_lost()
|
|
||||||
|
|
|
@ -1,238 +0,0 @@
|
||||||
import time
|
|
||||||
import json
|
|
||||||
|
|
||||||
from twisted.python import log
|
|
||||||
from zope.interface import (
|
|
||||||
implementer,
|
|
||||||
Interface,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def create_usage_tracker(blur_usage, log_file, usage_db):
|
|
||||||
"""
|
|
||||||
:param int blur_usage: see UsageTracker
|
|
||||||
|
|
||||||
:param log_file: None or a file-like object to write JSON-encoded
|
|
||||||
lines of usage information to.
|
|
||||||
|
|
||||||
:param usage_db: None or an sqlite3 database connection
|
|
||||||
|
|
||||||
:returns: a new UsageTracker instance configured with backends.
|
|
||||||
"""
|
|
||||||
tracker = UsageTracker(blur_usage)
|
|
||||||
if usage_db:
|
|
||||||
tracker.add_backend(DatabaseUsageRecorder(usage_db))
|
|
||||||
if log_file:
|
|
||||||
tracker.add_backend(LogFileUsageRecorder(log_file))
|
|
||||||
return tracker
|
|
||||||
|
|
||||||
|
|
||||||
class IUsageWriter(Interface):
|
|
||||||
"""
|
|
||||||
Records actual usage statistics in some way
|
|
||||||
"""
|
|
||||||
|
|
||||||
def record_usage(started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
|
|
||||||
"""
|
|
||||||
:param int started: timestemp when this connection began
|
|
||||||
|
|
||||||
:param float total_time: total seconds this connection lasted
|
|
||||||
|
|
||||||
:param float waiting_time: None or the total seconds one side
|
|
||||||
waited for the other
|
|
||||||
|
|
||||||
:param int total_bytes: the total bytes sent. In case the
|
|
||||||
connection was concluded successfully, only one side will
|
|
||||||
record the total bytes (but count both).
|
|
||||||
|
|
||||||
:param str mood: the 'mood' of the connection
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(IUsageWriter)
|
|
||||||
class MemoryUsageRecorder:
|
|
||||||
"""
|
|
||||||
Remebers usage records in memory.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.events = []
|
|
||||||
|
|
||||||
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
|
|
||||||
"""
|
|
||||||
IUsageWriter.
|
|
||||||
"""
|
|
||||||
data = {
|
|
||||||
"started": started,
|
|
||||||
"total_time": total_time,
|
|
||||||
"waiting_time": waiting_time,
|
|
||||||
"total_bytes": total_bytes,
|
|
||||||
"mood": mood,
|
|
||||||
}
|
|
||||||
self.events.append(data)
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(IUsageWriter)
|
|
||||||
class LogFileUsageRecorder:
|
|
||||||
"""
|
|
||||||
Writes usage records to a file. The records are written in JSON,
|
|
||||||
one record per line.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, writable_file):
|
|
||||||
self._file = writable_file
|
|
||||||
|
|
||||||
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
|
|
||||||
"""
|
|
||||||
IUsageWriter.
|
|
||||||
"""
|
|
||||||
data = {
|
|
||||||
"started": started,
|
|
||||||
"total_time": total_time,
|
|
||||||
"waiting_time": waiting_time,
|
|
||||||
"total_bytes": total_bytes,
|
|
||||||
"mood": mood,
|
|
||||||
}
|
|
||||||
self._file.write(json.dumps(data) + "\n")
|
|
||||||
self._file.flush()
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(IUsageWriter)
|
|
||||||
class DatabaseUsageRecorder:
|
|
||||||
"""
|
|
||||||
Write usage records into a database
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, db):
|
|
||||||
self._db = db
|
|
||||||
|
|
||||||
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
|
|
||||||
"""
|
|
||||||
IUsageWriter.
|
|
||||||
"""
|
|
||||||
self._db.execute(
|
|
||||||
"INSERT INTO `usage`"
|
|
||||||
" (`started`, `total_time`, `waiting_time`,"
|
|
||||||
" `total_bytes`, `result`)"
|
|
||||||
" VALUES (?,?,?,?,?)",
|
|
||||||
(started, total_time, waiting_time, total_bytes, mood)
|
|
||||||
)
|
|
||||||
# original code did "self._update_stats()" here, thus causing
|
|
||||||
# "global" stats update on every connection update .. should
|
|
||||||
# we repeat this behavior, or really only record every
|
|
||||||
# 60-seconds with the timer?
|
|
||||||
self._db.commit()
|
|
||||||
|
|
||||||
|
|
||||||
class UsageTracker(object):
|
|
||||||
"""
|
|
||||||
Tracks usage statistics of connections
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, blur_usage):
|
|
||||||
"""
|
|
||||||
:param int blur_usage: None or the number of seconds to use as a
|
|
||||||
window around which to blur time statistics (e.g. "60" means times
|
|
||||||
will be rounded to 1 minute intervals). When blur_usage is
|
|
||||||
non-zero, sizes will also be rounded into buckets of "one
|
|
||||||
megabyte", "one gigabyte" or "lots"
|
|
||||||
"""
|
|
||||||
self._backends = set()
|
|
||||||
self._blur_usage = blur_usage
|
|
||||||
if blur_usage:
|
|
||||||
log.msg("blurring access times to %d seconds" % self._blur_usage)
|
|
||||||
else:
|
|
||||||
log.msg("not blurring access times")
|
|
||||||
|
|
||||||
def add_backend(self, backend):
|
|
||||||
"""
|
|
||||||
Add a new backend.
|
|
||||||
|
|
||||||
:param IUsageWriter backend: the backend to add
|
|
||||||
"""
|
|
||||||
self._backends.add(backend)
|
|
||||||
|
|
||||||
def record(self, started, buddy_started, result, bytes_sent, buddy_bytes):
|
|
||||||
"""
|
|
||||||
:param int started: timestamp when our connection started
|
|
||||||
|
|
||||||
:param int buddy_started: None, or the timestamp when our
|
|
||||||
partner's connection started (will be None if we don't yet
|
|
||||||
have a partner).
|
|
||||||
|
|
||||||
:param str result: a label for the result of the connection
|
|
||||||
(one of the "moods").
|
|
||||||
|
|
||||||
:param int bytes_sent: number of bytes we sent
|
|
||||||
|
|
||||||
:param int buddy_bytes: number of bytes our partner sent
|
|
||||||
"""
|
|
||||||
# ideally self._reactor.seconds() or similar, but ..
|
|
||||||
finished = time.time()
|
|
||||||
if buddy_started is not None:
|
|
||||||
starts = [started, buddy_started]
|
|
||||||
total_time = finished - min(starts)
|
|
||||||
waiting_time = max(starts) - min(starts)
|
|
||||||
total_bytes = bytes_sent + buddy_bytes
|
|
||||||
else:
|
|
||||||
total_time = finished - started
|
|
||||||
waiting_time = None
|
|
||||||
total_bytes = bytes_sent
|
|
||||||
# note that "bytes_sent" should always be 0 here, but
|
|
||||||
# we're recording what the state-machine remembered in any
|
|
||||||
# case
|
|
||||||
|
|
||||||
if self._blur_usage:
|
|
||||||
started = self._blur_usage * (started // self._blur_usage)
|
|
||||||
total_bytes = blur_size(total_bytes)
|
|
||||||
|
|
||||||
# This is "a dict" instead of "kwargs" because we have to make
|
|
||||||
# it into a dict for the log use-case and in-memory/testing
|
|
||||||
# use-case anyway so this is less repeats of the names.
|
|
||||||
self._notify_backends({
|
|
||||||
"started": started,
|
|
||||||
"total_time": total_time,
|
|
||||||
"waiting_time": waiting_time,
|
|
||||||
"total_bytes": total_bytes,
|
|
||||||
"mood": result,
|
|
||||||
})
|
|
||||||
|
|
||||||
def update_stats(self, rebooted, updated, connected, waiting,
|
|
||||||
incomplete_bytes):
|
|
||||||
"""
|
|
||||||
Update general statistics.
|
|
||||||
"""
|
|
||||||
# in original code, this is only recorded in the database
|
|
||||||
# .. perhaps a better way to do this, but ..
|
|
||||||
for backend in self._backends:
|
|
||||||
if isinstance(backend, DatabaseUsageRecorder):
|
|
||||||
backend._db.execute("DELETE FROM `current`")
|
|
||||||
backend._db.execute(
|
|
||||||
"INSERT INTO `current`"
|
|
||||||
" (`rebooted`, `updated`, `connected`, `waiting`,"
|
|
||||||
" `incomplete_bytes`)"
|
|
||||||
" VALUES (?, ?, ?, ?, ?)",
|
|
||||||
(int(rebooted), int(updated), connected, waiting,
|
|
||||||
incomplete_bytes)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _notify_backends(self, data):
|
|
||||||
"""
|
|
||||||
Internal helper. Tell every backend we have about a new usage record.
|
|
||||||
"""
|
|
||||||
for backend in self._backends:
|
|
||||||
backend.record_usage(**data)
|
|
||||||
|
|
||||||
|
|
||||||
def round_to(size, coarseness):
|
|
||||||
return int(coarseness*(1+int((size-1)/coarseness)))
|
|
||||||
|
|
||||||
|
|
||||||
def blur_size(size):
|
|
||||||
if size == 0:
|
|
||||||
return 0
|
|
||||||
if size < 1e6:
|
|
||||||
return round_to(size, 10e3)
|
|
||||||
if size < 1e9:
|
|
||||||
return round_to(size, 1e6)
|
|
||||||
return round_to(size, 100e6)
|
|
2
tox.ini
2
tox.ini
|
@ -4,7 +4,7 @@
|
||||||
# and then run "tox" from this directory.
|
# and then run "tox" from this directory.
|
||||||
|
|
||||||
[tox]
|
[tox]
|
||||||
envlist = {py37,py38,py39,py310,pypy}
|
envlist = {py27,py34,py35,py36,pypy}
|
||||||
skip_missing_interpreters = True
|
skip_missing_interpreters = True
|
||||||
minversion = 2.4.0
|
minversion = 2.4.0
|
||||||
|
|
||||||
|
|
82
ws_client.py
82
ws_client.py
|
@ -1,82 +0,0 @@
|
||||||
"""
|
|
||||||
This is a test-client for the transit-relay that uses WebSockets.
|
|
||||||
|
|
||||||
If an additional command-line argument (anything) is added, it will
|
|
||||||
send 5 messages upon connection. Otherwise, it just prints out what is
|
|
||||||
received. Uses a fixed token of 64 'a' characters. Always connects on
|
|
||||||
localhost:4002
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from twisted.internet import endpoints
|
|
||||||
from twisted.internet.defer import (
|
|
||||||
Deferred,
|
|
||||||
inlineCallbacks,
|
|
||||||
)
|
|
||||||
from twisted.internet.task import react, deferLater
|
|
||||||
|
|
||||||
from autobahn.twisted.websocket import (
|
|
||||||
WebSocketClientProtocol,
|
|
||||||
WebSocketClientFactory,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class RelayEchoClient(WebSocketClientProtocol):
|
|
||||||
|
|
||||||
def onOpen(self):
|
|
||||||
self._received = b""
|
|
||||||
self.sendMessage(
|
|
||||||
u"please relay {} for side {}".format(
|
|
||||||
self.factory.token,
|
|
||||||
self.factory.side,
|
|
||||||
).encode("ascii"),
|
|
||||||
True,
|
|
||||||
)
|
|
||||||
|
|
||||||
def onMessage(self, data, isBinary):
|
|
||||||
print(">onMessage: {} bytes".format(len(data)))
|
|
||||||
print(data, isBinary)
|
|
||||||
if data == b"ok\n":
|
|
||||||
self.factory.ready.callback(None)
|
|
||||||
else:
|
|
||||||
self._received += data
|
|
||||||
if False:
|
|
||||||
# test abrupt hangup from receiving side
|
|
||||||
self.transport.loseConnection()
|
|
||||||
|
|
||||||
def onClose(self, wasClean, code, reason):
|
|
||||||
print(">onClose", wasClean, code, reason)
|
|
||||||
self.factory.done.callback(reason)
|
|
||||||
if not self.factory.ready.called:
|
|
||||||
self.factory.ready.errback(RuntimeError(reason))
|
|
||||||
|
|
||||||
|
|
||||||
@react
|
|
||||||
@inlineCallbacks
|
|
||||||
def main(reactor):
|
|
||||||
will_send_message = len(sys.argv) > 1
|
|
||||||
ep = endpoints.clientFromString(reactor, "tcp:localhost:4002")
|
|
||||||
f = WebSocketClientFactory("ws://127.0.0.1:4002/")
|
|
||||||
f.reactor = reactor
|
|
||||||
f.protocol = RelayEchoClient
|
|
||||||
f.token = "a" * 64
|
|
||||||
f.side = "0" * 16 if will_send_message else "1" * 16
|
|
||||||
f.done = Deferred()
|
|
||||||
f.ready = Deferred()
|
|
||||||
|
|
||||||
proto = yield ep.connect(f)
|
|
||||||
print("proto", proto)
|
|
||||||
yield f.ready
|
|
||||||
|
|
||||||
print("ready")
|
|
||||||
if will_send_message:
|
|
||||||
for _ in range(5):
|
|
||||||
print("sending message")
|
|
||||||
proto.sendMessage(b"it's a message", True)
|
|
||||||
yield deferLater(reactor, 0.2)
|
|
||||||
yield proto.sendClose()
|
|
||||||
print("closing")
|
|
||||||
yield f.done
|
|
||||||
print("relayed {} bytes:".format(len(proto._received)))
|
|
||||||
print(proto._received.decode("utf8"))
|
|
Loading…
Reference in New Issue
Block a user