In this post, the first in a series of three, I'm going to show you how to build an elegant simulator of a concurrent system using the amazing Curio library from David Beazley that's built on Python 3.6's async coroutine features. I hope it'll provide a nice intro to how to take advantage of event-driven programming in Python using the Curio library and the new async/await semantics in Python 3.4+ (although I will take advantage of some cool new 3.6 features to simplify my code).
In Part 1, we'll quickly build a simulation framework, which will then be enhanced and optimized into a "proper" framework in Part 2. I intend to finish up in Part 3 with a set of concrete examples on how to use it to simulate your own message-passing hardware systems that can solve interesting problems.
1 Background
First, some background on what I'll be describing: In graduate school and in my startup, we built hardware using asynchronous digital logic, and used a high-level abstraction called Communicating Hardware Processes (CHP) to model and reason about designs. CHP, based on Tony Hoare's CSP, is a way to represent circuits as a concurrent collection of sequential processes communicating and synchronizing via messaging channels. I'm not going to dive into the nuts and bolts of CHP, but I will show you how to build a simulator that models these types of event-driven systems using an incredibly small amount of code.
Second, why not asyncio, the new standard library that provides event-driven programming hooks? I did start there (and I highly recommend watching this talk by Robert Smallshire that builds up to the async constructs in Python 3.4 from the ground up), but in the end, I found asyncio a little too complicated and convoluted to get into quickly. It provides a strong set of different event-driven abstractions based on generators, tasks, channels, callbacks, and coroutines, and concrete implementations of different uses, but I was really just looking for something that provided a coroutine based event-loop, something that Curio provides quite naturally.
2 Describing hardware using communicating processes
Let's try to nail down some terminology.
- Process - This represents a piece of processing or code that's being executed sequentially.
- Channel - A channel is used to connect two processes together. A channel is strictly unidirectional, and one side is connected to the sender process and the other side to a receiver process. Channels are used to transfer data (like ints or strings) from the sender to the receiver. A channel, as a by-product of the way it works, also functions to synchronize the sender and receiver processes -- if the receiver isn't ready to receive and the channel is full, then the sender will block until the message is transferred into the channel.
- Port - The sender process has an output port that is connected to the channel, and the receiver has an input port on its end of the channel.
As an example, let's consider the simple one-place buffer that's shown below. 1 The single buffer has two ports, with L being the input port, and R being the output port. All the buffer does is receive a value on L, store it in x, and then send it out on R. We can then construct a system of these buffers that implement a linear pipeline by connecting the adjacent R and L ports with channels.
3 Building the Simulator
3.1 The Process class
First thing, let's start with a simple class to represent a Process (reminder that all code I'm writing probably requires Python 3.6)
class Process: next_id = 0 def __init__(self, name): self.name = name self.id = Process.next_id Process.next_id += 1 def __str__(self): return "{}.{}".format(self.name, self.id) def __repr__(self): return "{}('{}')".format(type(self).__name__, self.name) def message(self, m): print("{}: {}".format(str(self), m))
Nothing too crazy here; just a constructor that keeps track of a unique process id for every single Process that's created, plus some utility functions to create nice string representations and also a message method to hand output from the process in a standard way.
I want to segue here and introduce a new feature in 3.6 called fstrings, which are faster, cleaner to read, and just plain awesome once you start using them.
class Process: next_id = 0 def __init__(self, name): self.name = name self.id = Process.next_id Process.next_id += 1 def __str__(self): return f"{self.name}.{self.id}" def __repr__(self): return f"{type(self).__name__}('{self.name}')" def message(self, m): print(f"{self}: {m}")
Look at that, interpolation of code inside strings!
Alright, now let's create a Process subclass that actually does something useful: a Source process that sends a specified value on a channel a fixed number of times.
class Source(Process): def __init__(self, name, length, srcval): super().__init__(name) self.val = srcval self.length = length def connect(self, chan): self.out_chan = chan async def exec(self): for i in range(self.length): self.message(f"sending {self.val}") await self.out_chan.send(self.val) self.message(f"sent {self.val}") self.message("terminated")
The constructor lets us set the value and sequence length to output, and the connect rather naively just connects the passed in channel to an instance variable. We'll define channels in the next section, but for now, let's focus on the exec method which does the actual work. The async keyword marks this as an asynchronously executing coroutine (technically, it marks the method as an awaitable). This is the code that will get executed in Curio's event loop.
The code itself iterates the length of the sequence and calls a send method on the output channel. The method call is preceded by an await keyword, which is how you call async methods (the send method will be an async method). You can read up on the semantics of this (things like yield from), but if you just remember to use await before calling an async method as a rule, you'll be all set.
Now, let's further introduce a Sink process that 'eats' values (say from a source), and the Buffer process described in the previous section that just receives and passes on a value.
from curio import CancelledError class Sink(Process): def __init__(self, name): super().__init__(name) def connect(self, chan): self.in_chan = chan async def exec(self): tok_count = 0 try: while True: tok = await self.in_chan.recv() tok_count += 1 self.message(f"received {tok}") except CancelledError: self.message(f"{tok_count} tokens received") class Buffer(Process): def __init__(self, name): super().__init__(name) def connect(self, chan_l, chan_r): self.l_chan = chan_l self.r_chan = chan_r async def exec(self): while True: tok = await self.l_chan.recv() self.message(f"received {tok}") self.message(f"sending {tok}") await self.r_chan.send(tok)
One new thing here is how we receive data from an async method. The incoming data (or token) is assigned to the result of the await self.in_chan.recv() call. (Recall that the shortcut is, wherever we'd do a function call in synchronous code, we do an await on the function call instead in async code). The other point to note is that we can catch a CancelledError like in the Sink process to print something out if this process is terminated by the environment (which it will be, since otherwise it will just hang waiting for more input).
3.2 The Channel class
One more thing before we can make our first attempt at simulating a system of processes: We need a way to implement the communication channels. Luckily, Curio has a built-in Queue type that's explicitly designed for this kind of communication.
Here's our first stab at the Channel class, assuming we've done a from curio import Queue:
from curio import Queue class Channel: def __init__(self, name): self.name = name self.q = Queue(maxsize=1) # Max buffering of 1 async def send(self, val): await self.q.put(val) async def recv(self): tok = await self.q.get() await self.q.task_done() return tok async def close(self): await self.q.join()
Here, we've defined a Channel that has the actions send and recv (and a utility function close to terminate it). The send inserts a value into the internal Queue, and the recv will await on the value (and block if none is available). Once the recv is done, it's important to call the task_done method on the Queue to signal that the receive is complete.
4 Running the system
Now we have all the initial pieces available to put together a complete simulation of a linear pipeline of buffers with its environment. Here's our first version of the code:
from curio import run, spawn async def system(): N = 10 # How many buffers in our linear pipeline # Define the channels chan_l = Channel('l') chan_r = [] for i in range(N): chan_r.append(Channel(f'R[{i}]')) # Instantiate the processes src = Source('src1', 10, 1) buf = [Buffer(f'buf[{i}]') for i in range(N)] snk = Sink('snk') # Connect the processes with the channels src.connect(chan_l) buf[0].connect(chan_l, chan_r[0]) for i in range(1, N): buf[i].connect(chan_r[i-1], chan_r[i]) snk.connect(chan_r[N-1]) # Start the processes p_src = await spawn(src.exec()) p_snk = await spawn(snk.exec()) p_buf = [await spawn(buf[i].exec()) for i in range(N)] # Wait for the source to finish sending all its values await p_src.join() # Cancel the remaining processes for i in range(N): await p_buf[i].cancel() await p_snk.cancel() if __name__=='__main__': run(system(), with_monitor=True)
It's still a little more verbose than I'd like, but we'll fix that in the next part. Notice how all the Processes are started: We do an await spawn on the coroutines (in this case the exec coroutine method of our Processes) to start all the Processes running as tasks in Curio's event loop. Then, we just wait for the Source process to be complete by doing a join() on its task. Once we know the Source is done, we can go ahead and call cancel() on all the remaining tasks, because we know they are just waiting on their input channels.
src1.0: sending 1 src1.0: sent 1 src1.0: sending 1 buf[0].1: received 1 buf[0].1: sending 1 buf[1].2: received 1 buf[1].2: sending 1 buf[2].3: received 1 buf[2].3: sending 1 . . . src1.0: sent 1 src1.0: terminated buf[1].2: received 1 buf[1].2: sending 1 . . . buf[8].9: received 1 buf[8].9: sending 1 snk.11: received 1 buf[9].10: received 1 buf[9].10: sending 1 snk.11: received 1 snk.11: 10 tokens received
And there you go, a complete distributed system, simulated using Curio's concurrent event loop! You can find the complete runnable code (Python 3.6!) in this gist.
In the next article in this series, we'll talk about enhancing the simulator and adding some error-checking.
- from preprint of Asynchronous VLSI Systems by Rajit Manohar ↩︎