Getting started with Twisted

  • Sharebar

Twisted logo
At Wallix we use Twisted as our framework for some network components in our products Wallix AdminBastion and Wallix LogBox. In this post we will analyze a short snippet of code to help you getting started with Twisted.

So what is Twisted

Twisted is an event-driven network framework written in Python, it supports a large number of protocols such as UDP, TCP, TLS and many other high level protocols like HTTP, SMTP, NNTP, IRC, XMPP/Jabber… An interesting point about Twisted is that high level protocols are fully implemented, which lets the developer experiment with it easily. In fact it is pretty easy to modify the Twisted ssh server implementation for one’s own needs. Most of the time, all a developer will have to do is to overwrite methods in Twisted classes representing a protocol.

A Twisted program is composed of a main loop called the reactor and a callbacks system, when events occur like an incoming connection a callback or a callback chain will be executed to deal with the event.

Writing a basic TCP server with Twisted

The sample code below is a simple TCP server that logs data sent by a client.

==== code1.py ====
import sys
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor
 
class CmdProtocol(LineReceiver):
 
    delimiter = '\n'
 
    def connectionMade(self):
        self.client_ip = self.transport.getPeer()[1]
        log.msg("Client connection from %s" % self.client_ip)
        if len(self.factory.clients) >= self.factory.clients_max:
            log.msg("Too many connections. bye !")
            self.client_ip = None
            self.transport.loseConnection()
        else:
            self.factory.clients.append(self.client_ip)
 
    def connectionLost(self, reason):
        log.msg('Lost client connection.  Reason: %s' % reason)
        if self.client_ip:
            self.factory.clients.remove(self.client_ip)
 
    def lineReceived(self, line):
        log.msg('Cmd received from %s : %s' % (self.client_ip, line))
 
class MyFactory(ServerFactory):
 
    protocol = CmdProtocol
 
    def __init__(self, clients_max=10):
        self.clients_max = clients_max
        self.clients = []
 
log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()
=====

The most important piece of code here is:

from twisted.internet import reactor
reactor.run()

This will start the Twisted reactor but the main loop will not react to incoming events as we attach nothing to it.

In the code sample above we create a “ServerFactory” that is in charge of instantiating the “CmdProtocol” class. Each incoming connection will be handled by a “CmdProtocol” instance. Twisted’s reactor will call instance methods automatically according to the event that occurred on the TCP connection; as you probably noticed the protocol’s methods are named according to the events they refer to.

When a client connects to this server “connectionMade” is called by the reactor; this is where you might want to deal with connection authorizations for example. Here we have implemented a restriction on the amount of clients that can be currently connected. Each protocol instance has a back reference to the factory “self.factory” allowing us to keep track of all clients.

Our “CmdProtocol” is a subclass of “twisted.protocols.basic.LineReceiver” that will split data received from a client into lines according to the defined delimiter (some protocol use \r\n as a line delimiter, for example). The method lineReceived will be called on each received line. Later we can improve lineReceived to parse commands and act accordingly.

Twisted implements a logging system that we configure to log on stdout.

Now with reactor.listenTCP we bind our factory “MyFactory” on TCP port 9999.

user@lab:~/TMP$ python code1.py
2011-08-29 13:32:32+0200 [-] Log opened.
2011-08-29 13:32:32+0200 [-] __main__.MyFactory starting on 9999
2011-08-29 13:32:32+0200 [-] Starting factory <__main__.MyFactory instance at 0x227e320
2011-08-29 13:32:35+0200 [__main__.MyFactory] Client connection from 127.0.0.1
2011-08-29 13:32:38+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : hello server
 
user@lab:~$ netcat 127.0.0.1 9999
hello server

Calling an external process with Twisted

We’ll add to our previous server one command that lets the client read the last logs from
/var/log/syslog on the server.

import sys
import os
 
from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor
 
class TailProtocol(ProcessProtocol):
    def __init__(self, write_callback):
        self.write = write_callback
 
    def outReceived(self, data):
        self.write("Begin lastlog\n")
        data = [line for line in data.split('\n') if not line.startswith('==')]
        for d in data:
            self.write(d + '\n')
        self.write("End lastlog\n")
 
    def processEnded(self, reason):
        if reason.value.exitCode != 0:
            log.msg(reason)
 
class CmdProtocol(LineReceiver):
 
    delimiter = '\n'
 
    def processCmd(self, line):
        if line.startswith('lastlog'):
            tailProtocol = TailProtocol(self.transport.write)
            reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog'])
        elif line.startswith('exit'):
            self.transport.loseConnection()
        else:
            self.transport.write('Command not found.\n')
 
    def connectionMade(self):
        self.client_ip = self.transport.getPeer()[1]
        log.msg("Client connection from %s" % self.client_ip)
        if len(self.factory.clients) >= self.factory.clients_max:
            log.msg("Too many connections. bye !")
            self.client_ip = None
            self.transport.loseConnection()
        else:
            self.factory.clients.append(self.client_ip)
 
    def connectionLost(self, reason):
        log.msg('Lost client connection.  Reason: %s' % reason)
        if self.client_ip:
            self.factory.clients.remove(self.client_ip)
 
    def lineReceived(self, line):
        log.msg('Cmd received from %s : %s' % (self.client_ip, line))
        self.processCmd(line)
 
class MyFactory(ServerFactory):
 
    protocol = CmdProtocol
 
    def __init__(self, clients_max=10):
        self.clients_max = clients_max
        self.clients = []
 
log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()

The “processCmd” method is called each time a command is received from a client. The server will end the connection if the client sends the ‘exit’ command. If ‘lastlog’ is sent then we’ll start the tail command and then return the output to the client. To start a process we use reactor.spawnProcess and provide it with a protocol to communicate with the process. TailProtocol implements two methods of its parent class ProcessProtocol, both processEnded and outReceived will be automatically called by the Twisted reactor. In outReceived the tail output (stdout) is sent to the client. processEnded is the last method to be called and returns the process exit status. As you see dealing with processes and network protocols is really close.

user@lab:~/TMP$ python code2.py
2011-08-29 15:13:38+0200 [-] Log opened.
2011-08-29 15:13:38+0200 [-] __main__.MyFactory starting on 9999
2011-08-29 15:13:38+0200 [-] Starting factory <__main__.MyFactory instance at 0x1a5a3f8>
2011-08-29 15:13:47+0200 [__main__.MyFactory] Client connection from 127.0.0.1
2011-08-29 15:13:58+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : test
2011-08-29 15:14:02+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : lastlog
2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : exit
2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Lost client connection.  Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.
 
user@lab:~$ netcat 127.0.0.1 9999
test
Command not found.
lastlog
Begin lastlog
Aug 29 15:02:03 lab sSMTP[5919]: Unable to locate mail
Aug 29 15:02:03 lab sSMTP[5919]: Cannot open mail:25
Aug 29 15:02:03 lab CRON[4945]: (CRON) error (grandchild #4947 failed with exit status 1)
Aug 29 15:02:03 lab sSMTP[5922]: Unable to locate mail
Aug 29 15:02:03 lab sSMTP[5922]: Cannot open mail:25
Aug 29 15:02:03 lab CRON[4945]: (logcheck) MAIL (mailed 1 byte of output; but got status 0x0001, #012)
Aug 29 15:05:01 lab CRON[5925]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1)
Aug 29 15:10:01 lab CRON[5930]: (root) CMD (test -x /usr/lib/atsar/atsa1 && /usr/lib/atsar/atsa1)
Aug 29 15:10:01 lab CRON[5928]: (CRON) error (grandchild #5930 failed with exit status 1)
Aug 29 15:13:21 lab pulseaudio[3361]: ratelimit.c: 387 events suppressed
 
End lastlog
exit

Using a Deferred object

The reactor is a loop waiting for events that must not be blocked by any piece of code. E.g read/write accesses on a database, long calculations, third party libraries. Code that can take time must return as soon as possible a Deferred object. Deferred is a special object that automatically calls its callback(s) when it receives a result. Blocking the main loop (reactor loop) will block processing of events e.g no more incoming connection.

Now we want our server to be able to compute a SHA1 hash of a requested file, this is a task that can be long so
we will use a Deferred.

import sys
import os
import hashlib
 
from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor, threads
 
class TailProtocol(ProcessProtocol):
    def __init__(self, write_callback):
        self.write = write_callback
 
    def outReceived(self, data):
        self.write("Begin lastlog\n")
        data = [line for line in data.split('\n') if not line.startswith('==')]
        for d in data:
            self.write(d + '\n')
        self.write("End lastlog\n")
 
    def processEnded(self, reason):
        if reason.value.exitCode != 0:
            log.msg(reason)
 
class HashCompute(object):
    def __init__(self, path, write_callback):
        self.path = path
        self.write = write_callback
 
    def blockingMethod(self):
        os.path.isfile(self.path)
        data = file(self.path).read()
        # uncomment to add more delay
        # import time
        # time.sleep(10)
        return hashlib.sha1(data).hexdigest()
 
    def compute(self):
        d = threads.deferToThread(self.blockingMethod)
        d.addCallback(self.ret)
        d.addErrback(self.err)
 
    def ret(self, hdata):
        self.write("File hash is : %s\n" % hdata)
 
    def err(self, failure):
        self.write("An error occured : %s\n" % failure.getErrorMessage())
 
class CmdProtocol(LineReceiver):
 
    delimiter = '\n'
 
    def processCmd(self, line):
        if line.startswith('lastlog'):
            tailProtocol = TailProtocol(self.transport.write)
            reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog'])
        elif line.startswith('comphash'):
            try:
                useless, path = line.split(' ')
            except:
                self.transport.write('Please provide a path.\n')
                return
            hc = HashCompute(path, self.transport.write)
            hc.compute()
        elif line.startswith('exit'):
            self.transport.loseConnection()
        else:
            self.transport.write('Command not found.\n')
 
    def connectionMade(self):
        self.client_ip = self.transport.getPeer()[1]
        log.msg("Client connection from %s" % self.client_ip)
        if len(self.factory.clients) >= self.factory.clients_max:
            log.msg("Too many connections. bye !")
            self.client_ip = None
            self.transport.loseConnection()
        else:
            self.factory.clients.append(self.client_ip)
 
    def connectionLost(self, reason):
        log.msg('Lost client connection.  Reason: %s' % reason)
        if self.client_ip:
            self.factory.clients.remove(self.client_ip)
 
    def lineReceived(self, line):
        log.msg('Cmd received from %s : %s' % (self.client_ip, line))
        self.processCmd(line)
 
class MyFactory(ServerFactory):
 
    protocol = CmdProtocol
 
    def __init__(self, clients_max=10):
        self.clients_max = clients_max
        self.clients = []
 
log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()

“blockingMethod” reads a file from the filesystem and then computes its hash. We use a Twisted facility, deferToThread, that returns a Deferred object. The “compute” method returns a Deferred immediately and lets the main loop process other events. The Deferred will fire its callbacks as soon as the method passed to deferToThread finishes and returns a value. In the “compute” method we attach a callback and an error callback to the Deferred. The error callback will be fired if blockingMethod raises an exception. The callback is fired when the hash calculation is done and then returns the hash to the client. The method argument hdata of “self.ret” is the value returned by “blockingMethod”.

More information

Have a look at the Twisted related documentation:

The API documentation is very useful too:

http://twistedmatrix.com/documents/current/api/twisted.html

Article contributed by Fabien Boucher, Technical leader at Wallix.

Incoming search terms:

  • twisted process protocol linereceiver
  • twisted Factory
  • python twisted example
  • python twisted tutorial
  • twisted LineReceiver
  • twisted reactor transport
  • twisted internet protocol
  • twisted serverfactory
  • python twisted reactor
  • twisted internet error ConnectionDone
This entry was posted in development and tagged , , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

*


four × = 12

* Copy This Password *

* Type Or Paste Password Here *

56,122 Spam Comments Blocked so far by Spam Free Wordpress

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>