core.async works great for building large systems and decoupling dependencies in a program. At uSwitch, we’re using the core.async in several systems to balance workload and decouple processing steps, and in the ClojureCup competition, our architecture relied heavily on core.async for separating concerns, allowing us to develop parts of the project in parallel.
In some of Rich Hickeys talks, he focues on building your system like a factory. One of the main points is, that it provides transparency to the processes in the system. Putting your data on a virtual conveyor belt allows you to take it off or put it back on another belt at any time. Having worked with core.async for a while, we can testify to the fact that putting stuff on queues and channels truely makes a difference, but we are yet to gain transparency of our data as it travels through our system.
This post will demonstrate methods to inspect what objects are on core.async channels at any given time, by implementing custom buffers and by extending the buffers supplied with core.async. The post is split in a strategy for Clojure, and a strategy for ClojureScript, as the two platforms have different buffer implementations.
In core.async, when creating a new channel with chan
, it is possible
to specify a buffer to use for the channel. There are currently three
types of buffers supplied with core.async:
FixedBuffer
- a buffer of a specific size. When you try to put
things in the buffer and there is no room for them, your operation
will block.DroppingBuffer
- a buffer of a specific size. When you try to put
things in the buffer and there is no room for them, they will be
silently dropped.SlidingBuffer
- a buffer of a specific size. When you try to put
things in the buffer and there is no room for them, items at the head
of the buffer will be dropped to make room for new elements.There are no infinite buffers - if you build one, you will probably regret it.
None of the built in buffers allows inspection of what is currently on
in buffer. They implement ICountable
, so it is possible to see how much
is on it. By implementing our own buffer, we can build in methods for
inspecting the contents.
The Buffer
protocol is defined in clojure.core.async.impl.protocols
and looks like this:
Let’s take the source for FixedBuffer
in
clojure.core.async.impl.buffers
and make our own
TransparentFixedBuffer
:
As a bonus, our buffer is deref
-able. We can easily use our buffer
with core.async
(look here
for the the complete code).
The ClojureScript implementation of core.async also contains implementations of fixed, dropping and sliding buffers. The buffer implementations are based on an implementation of ring buffers. Rather than copying the entire implementation of ring buffers, we can write a function for extracting the content of a ring buffer and extend the available buffers.
Please note that these implementations make hard assumptions on the underlying ring buffer. A more stabile implementation would re-implement the ring buffer to be certain to have it available.
The buffers above can be used directly in the browser. This allows us to more clearly demonstrate how the three different buffer strategies work directly in the browser. The demo below is a full implementation using core.async and the inspect function from above. The code is available at cljsfiddle.
Try the different buffer strategies to see the differences when more data is pushed through than will fit.
This post has illustrated how to implement custom buffers as well as extending existing buffers to gain insight into what is on the channels they represent. It should be noted that the number of items can be inspected on the existing channels with no alternations.
An open problem is to monitor the throughput of a channel. Using the inspection tricks presented here only gives us snapshots of the state, but we do not gain any insight into statistics such as average time an item spends on a channel.
Thanks to Jonas Enlund for having created cljsfiddle. It’s a great way to play around with ClojureScript and to share example code.
Timothy Baldridge has supplied a bit of insight in the comments section,
namely that the ClojureScript version will work fine, as we do not need
to worry about threads when accessing the ring buffer. The Clojure
version, however, is not thread-safe, as LinkedList
s are not
thread-safe. You can wrap the buffer operations in an atom for your
queue in your custom buffer implementation, and use an immutable
datastructure. This will give you a performance hit, but you probably
don’t want to look at the content of a buffer in a high performant
context. This is left as an exercise :).