
At Wallix we use Twisted as our framework for some network components in our products Wallix AdminBastion and Wallix LogBox. In this post we will analyze a short snippet of code to help you getting started with Twisted.
So what is Twisted
Twisted is an event-driven network framework written in Python, it supports a large number of protocols such as UDP, TCP, TLS and many other high level protocols like HTTP, SMTP, NNTP, IRC, XMPP/Jabber… An interesting point about Twisted is that high level protocols are fully implemented, which lets the developer experiment with it easily. In fact it is pretty easy to modify the Twisted ssh server implementation for one’s own needs. Most of the time, all a developer will have to do is to overwrite methods in Twisted classes representing a protocol.
A Twisted program is composed of a main loop called the reactor and a callbacks system, when events occur like an incoming connection a callback or a callback chain will be executed to deal with the event.
Writing a basic TCP server with Twisted
The sample code below is a simple TCP server that logs data sent by a client.
==== code1.py ==== import sys from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver from twisted.python import log from twisted.internet import reactor class CmdProtocol(LineReceiver): delimiter = '\n' def connectionMade(self): self.client_ip = self.transport.getPeer()[1] log.msg("Client connection from %s" % self.client_ip) if len(self.factory.clients) >= self.factory.clients_max: log.msg("Too many connections. bye !") self.client_ip = None self.transport.loseConnection() else: self.factory.clients.append(self.client_ip) def connectionLost(self, reason): log.msg('Lost client connection. Reason: %s' % reason) if self.client_ip: self.factory.clients.remove(self.client_ip) def lineReceived(self, line): log.msg('Cmd received from %s : %s' % (self.client_ip, line)) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max self.clients = [] log.startLogging(sys.stdout) reactor.listenTCP(9999, MyFactory(2)) reactor.run() ===== |
The most important piece of code here is:
from twisted.internet import reactor reactor.run() |
This will start the Twisted reactor but the main loop will not react to incoming events as we attach nothing to it.
In the code sample above we create a “ServerFactory” that is in charge of instantiating the “CmdProtocol” class. Each incoming connection will be handled by a “CmdProtocol” instance. Twisted’s reactor will call instance methods automatically according to the event that occurred on the TCP connection; as you probably noticed the protocol’s methods are named according to the events they refer to.
When a client connects to this server “connectionMade” is called by the reactor; this is where you might want to deal with connection authorizations for example. Here we have implemented a restriction on the amount of clients that can be currently connected. Each protocol instance has a back reference to the factory “self.factory” allowing us to keep track of all clients.
Our “CmdProtocol” is a subclass of “twisted.protocols.basic.LineReceiver” that will split data received from a client into lines according to the defined delimiter (some protocol use \r\n as a line delimiter, for example). The method lineReceived will be called on each received line. Later we can improve lineReceived to parse commands and act accordingly.
Twisted implements a logging system that we configure to log on stdout.
Now with reactor.listenTCP we bind our factory “MyFactory” on TCP port 9999.
user@lab:~/TMP$ python code1.py 2011-08-29 13:32:32+0200 [-] Log opened. 2011-08-29 13:32:32+0200 [-] __main__.MyFactory starting on 9999 2011-08-29 13:32:32+0200 [-] Starting factory <__main__.MyFactory instance at 0x227e320 2011-08-29 13:32:35+0200 [__main__.MyFactory] Client connection from 127.0.0.1 2011-08-29 13:32:38+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : hello server user@lab:~$ netcat 127.0.0.1 9999 hello server |
Calling an external process with Twisted
We’ll add to our previous server one command that lets the client read the last logs from
/var/log/syslog on the server.
import sys import os from twisted.internet.protocol import ServerFactory, ProcessProtocol from twisted.protocols.basic import LineReceiver from twisted.python import log from twisted.internet import reactor class TailProtocol(ProcessProtocol): def __init__(self, write_callback): self.write = write_callback def outReceived(self, data): self.write("Begin lastlog\n") data = [line for line in data.split('\n') if not line.startswith('==')] for d in data: self.write(d + '\n') self.write("End lastlog\n") def processEnded(self, reason): if reason.value.exitCode != 0: log.msg(reason) class CmdProtocol(LineReceiver): delimiter = '\n' def processCmd(self, line): if line.startswith('lastlog'): tailProtocol = TailProtocol(self.transport.write) reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog']) elif line.startswith('exit'): self.transport.loseConnection() else: self.transport.write('Command not found.\n') def connectionMade(self): self.client_ip = self.transport.getPeer()[1] log.msg("Client connection from %s" % self.client_ip) if len(self.factory.clients) >= self.factory.clients_max: log.msg("Too many connections. bye !") self.client_ip = None self.transport.loseConnection() else: self.factory.clients.append(self.client_ip) def connectionLost(self, reason): log.msg('Lost client connection. Reason: %s' % reason) if self.client_ip: self.factory.clients.remove(self.client_ip) def lineReceived(self, line): log.msg('Cmd received from %s : %s' % (self.client_ip, line)) self.processCmd(line) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max self.clients = [] log.startLogging(sys.stdout) reactor.listenTCP(9999, MyFactory(2)) reactor.run() |
The “processCmd” method is called each time a command is received from a client. The server will end the connection if the client sends the ‘exit’ command. If ‘lastlog’ is sent then we’ll start the tail command and then return the output to the client. To start a process we use reactor.spawnProcess and provide it with a protocol to communicate with the process. TailProtocol implements two methods of its parent class ProcessProtocol, both processEnded and outReceived will be automatically called by the Twisted reactor. In outReceived the tail output (stdout) is sent to the client. processEnded is the last method to be called and returns the process exit status. As you see dealing with processes and network protocols is really close.
user@lab:~/TMP$ python code2.py 2011-08-29 15:13:38+0200 [-] Log opened. 2011-08-29 15:13:38+0200 [-] __main__.MyFactory starting on 9999 2011-08-29 15:13:38+0200 [-] Starting factory <__main__.MyFactory instance at 0x1a5a3f8> 2011-08-29 15:13:47+0200 [__main__.MyFactory] Client connection from 127.0.0.1 2011-08-29 15:13:58+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : test 2011-08-29 15:14:02+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : lastlog 2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : exit 2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Lost client connection. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly. user@lab:~$ netcat 127.0.0.1 9999 test Command not found. lastlog Begin lastlog Aug 29 15:02:03 lab sSMTP[5919]: Unable to locate mail Aug 29 15:02:03 lab sSMTP[5919]: Cannot open mail:25 Aug 29 15:02:03 lab CRON[4945]: (CRON) error (grandchild #4947 failed with exit status 1) Aug 29 15:02:03 lab sSMTP[5922]: Unable to locate mail Aug 29 15:02:03 lab sSMTP[5922]: Cannot open mail:25 Aug 29 15:02:03 lab CRON[4945]: (logcheck) MAIL (mailed 1 byte of output; but got status 0x0001, #012) Aug 29 15:05:01 lab CRON[5925]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1) Aug 29 15:10:01 lab CRON[5930]: (root) CMD (test -x /usr/lib/atsar/atsa1 && /usr/lib/atsar/atsa1) Aug 29 15:10:01 lab CRON[5928]: (CRON) error (grandchild #5930 failed with exit status 1) Aug 29 15:13:21 lab pulseaudio[3361]: ratelimit.c: 387 events suppressed End lastlog exit |
Using a Deferred object
The reactor is a loop waiting for events that must not be blocked by any piece of code. E.g read/write accesses on a database, long calculations, third party libraries. Code that can take time must return as soon as possible a Deferred object. Deferred is a special object that automatically calls its callback(s) when it receives a result. Blocking the main loop (reactor loop) will block processing of events e.g no more incoming connection.
Now we want our server to be able to compute a SHA1 hash of a requested file, this is a task that can be long so
we will use a Deferred.
import sys import os import hashlib from twisted.internet.protocol import ServerFactory, ProcessProtocol from twisted.protocols.basic import LineReceiver from twisted.python import log from twisted.internet import reactor, threads class TailProtocol(ProcessProtocol): def __init__(self, write_callback): self.write = write_callback def outReceived(self, data): self.write("Begin lastlog\n") data = [line for line in data.split('\n') if not line.startswith('==')] for d in data: self.write(d + '\n') self.write("End lastlog\n") def processEnded(self, reason): if reason.value.exitCode != 0: log.msg(reason) class HashCompute(object): def __init__(self, path, write_callback): self.path = path self.write = write_callback def blockingMethod(self): os.path.isfile(self.path) data = file(self.path).read() # uncomment to add more delay # import time # time.sleep(10) return hashlib.sha1(data).hexdigest() def compute(self): d = threads.deferToThread(self.blockingMethod) d.addCallback(self.ret) d.addErrback(self.err) def ret(self, hdata): self.write("File hash is : %s\n" % hdata) def err(self, failure): self.write("An error occured : %s\n" % failure.getErrorMessage()) class CmdProtocol(LineReceiver): delimiter = '\n' def processCmd(self, line): if line.startswith('lastlog'): tailProtocol = TailProtocol(self.transport.write) reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog']) elif line.startswith('comphash'): try: useless, path = line.split(' ') except: self.transport.write('Please provide a path.\n') return hc = HashCompute(path, self.transport.write) hc.compute() elif line.startswith('exit'): self.transport.loseConnection() else: self.transport.write('Command not found.\n') def connectionMade(self): self.client_ip = self.transport.getPeer()[1] log.msg("Client connection from %s" % self.client_ip) if len(self.factory.clients) >= self.factory.clients_max: log.msg("Too many connections. bye !") self.client_ip = None self.transport.loseConnection() else: self.factory.clients.append(self.client_ip) def connectionLost(self, reason): log.msg('Lost client connection. Reason: %s' % reason) if self.client_ip: self.factory.clients.remove(self.client_ip) def lineReceived(self, line): log.msg('Cmd received from %s : %s' % (self.client_ip, line)) self.processCmd(line) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max self.clients = [] log.startLogging(sys.stdout) reactor.listenTCP(9999, MyFactory(2)) reactor.run() |
“blockingMethod” reads a file from the filesystem and then computes its hash. We use a Twisted facility, deferToThread, that returns a Deferred object. The “compute” method returns a Deferred immediately and lets the main loop process other events. The Deferred will fire its callbacks as soon as the method passed to deferToThread finishes and returns a value. In the “compute” method we attach a callback and an error callback to the Deferred. The error callback will be fired if blockingMethod raises an exception. The callback is fired when the hash calculation is done and then returns the hash to the client. The method argument hdata of “self.ret” is the value returned by “blockingMethod”.
More information
Have a look at the Twisted related documentation:
- http://twistedmatrix.com/documents/current/core/howto/defer.html
- http://twistedmatrix.com/documents/current/core/howto/process.html
- http://twistedmatrix.com/documents/current/core/howto/servers.html
The API documentation is very useful too:
http://twistedmatrix.com/documents/current/api/twisted.html
Article contributed by Fabien Boucher, Technical leader at Wallix.
Incoming search terms:
- twisted process protocol linereceiver
- twisted Factory
- python twisted example
- twisted LineReceiver
- twisted reactor transport
- twisted serverfactory
- twisted internet protocol
- python twisted reactor
- twisted tcp server
- python twisted tutorial
I’ve never really used twisted, but after reading this article I think I’ll check it out. Thanks!
Pingback: Wallix: Getting started with Twisted | Python | Syngu
Great article! Good introductory articles, and indeed documentation in general, is something Twisted still sorely needs.
One thing: The file I/O should be accomplished using Twisted’s FilePath file I/O wrapper, not plain open/read/etc.
Pingback: Links 31/8/2011: RHEL 7, LibreOffice 3.4.3 | Techrights
Excellent! Hope to see more articles like this. Thanks a lot.
Wonderful goods from you, man. I have understand your stuff previous
to and you are just extremely magnificent. I actually like what you’ve acquired here, really like what you are stating and the way in which you say it. You make it enjoyable and you still take care of to keep it sensible. I can’t wait to read far more from
you. This is actually a terrific site.