Category: Computing

Cool things I have worked on: Clustered analytic database (by )

In my last blog post, I talked about how I was involved in building a database optimised for low latency on single-record reads.

So it was a bit of a change to later work on a database optimised for high throughput on bulk querying! Rather than getting single records, it was all about finding all the records that match some criterion and doing something with them as fast as possible, where "fast" is measured in "records per second". When I started there, the minimum time to perform a trivial query (eg, SELECT 1) was about three seconds, let alone any that fetched any data. However, on the right hardware, it could process untold numbers of gigabytes of data per second once it had gotten going. That three-second minimal round trip time was negligible when dealing with queries that processed hundreds of terabytes of data in ten minutes. That said, one of my first projects was to fix some low-hanging fruit, which brought the minimal round trip time down to around 300 milliseconds.

Unlike the previous database, this one had an SQL parser; rather than an API to get and set records, you could enter SQL queries in all their glory, and get back result sets.

You didn't generally INSERT, UPDATE or DELETE individual records (although that was possible, it just didn't perform well, as will be explained later). Generally, data came into the system through CSV or BCP files that were plopped into the filesystem. The system would sort them on a chosen key (why? You'll see), then split into partitions of something like a million records. These partitions were all compressed into "tree files". And because each partition was compressed independently, these jobs could be spread over the cluster.

The compression algorithm was interesting. If you put data in a CSV and gzip it, you'll knock it down to a quarter or so of its size, depending on the data. Our algorithm usually got a factor of ten better compression. This was a big deal for our customers as they had to store many petabytes of data, and dividing the size by a factor of forty meant spending a fortieth of the money on immense disk arrays. Also, most of our query performance was limited by how fast you could read data from disk what with modern CPUs being so good; so compressing things by a factor of forty meant forty times the bulk throughput.

It worked by looking at each column and working out a set of unique values present in it, and sorting them. These columns could then be compressed with fairly conventional techniques for storing sorted lists of things (such as storing each element as the difference from the previous element, and then using conventional algorithms such as deflate on the result).

So if we had the following data:


We would store it as a bunch of columns:

  • Make: Ford,Skoda
  • Model: Fabia,Mondeo,Transit
  • Colour: Blue,Red,White
  • Quantity: 4,6,7

But you still need to know how to reconstruct the records from those values. Each record could be thought of as a vector of integer indices, each of which was an index into the sorted values for that column. If you just stored those records-of-indices, you'd be able to reconstruct the records and would probably have gained some good compression.

If we start the indices at 0 for the first value in each column, our original data might now look like this, referring back to the sorted columns we stored:


But we did more than that. We didn't actually write those records-of-indices to disk. The compression engine took pairs of columns in the records-of-indices representation, and again deduplicated and sorted each of them.

Which pairs you took mattered greatly to how good a compression you got, in subtle ways, but we tried to get related columns. Make and Model are connected, so we'd do well to pick that one first. Each model only ever comes from a single manufacturer in our example, so the combined (make,model) columns only have three values - (0,1), (0,2) and (1,0). So we could store those in a list we'd call an "index set", which actually went onto disk, deduplicated and sorted and compressed like a normal column:

  • Make+Model: (0,1), (0,2), (1,0)

Now the table (in memory) looks like this:


Again, we pick a pair of columns. There's no obvious choice any more, so I don't know what our default heuristic would actually do, but let's say we decided to join Make+Model with quantity. The pairings we find are (0,2), (0,1), (1,2) which happens twice, and (3,0), so we'd store:

  • (Make+Model)+Quantity: (0,1), (0,2), (1,2), (3,0)

(not forgetting that the sorted list of values is difference-encoded and compressed, rather than written literally like that).

Our table in memory now has only two columns:


At this point, we can't continue any further. Each entry in that table corresponds to a record in the original data, and by looking up the index sets we can expand out the columns to recreate the original table in its entirety; so we'd sort that final two-column table and store it (difference-encoded and compressed, of course). Our final entry in the file, the "root indexset", would be:

  • root: (0,0), (1,2), (2,1), (2,2), (3,2)

But we don't need to decompress the entire tree. Unlike a "gzipped CSV", if we just want some of the columns of the table, we can not bother traversing the index set tree to extract every column. If we just wanted the colour for every car in the table, we'd read the root indexset, and the colour valueset so we could look up the colour values, and we'd be done. If we wanted just the makes, we'd need to read the root indexset, the make+model+quantity indexset, the make+model indexset, and the make valueset.

However, if we wanted MIN(Make), MAX(Make) or did a SELECT DISTINCT Make, then we don't even need the root set - we can just open the Make valueset and take the first or last value, or all values.

Anyway, tree files, once built, were given unique partition IDs and put into some kind of storage. We didn't really care what the storage was, and supported several different backends - anything that let us save a file with a name, and get the file back with the same name, would do nicely. We supported raw filesystems (over NFS or on a clustered filesystem so all the servers in the cluster could see it), HDFS, and various enterprise storage devices.

So how did we know what tree files made up each table? Well, a data structure called the registry mapped table names to table objects. We supported a range of different types of table objects, but the most common one was called a "Lazy Union", which represented a UNION ALL in the SQL sense - a bunch of tables concatenated together. It was lazy because we used it to union together all the tree files that made up a table, but the system avoided actually trying to load all the tree files and UNION ALL them together (as that would generally be many petabytes of data); in fact, we basically never actually read any tree files in a lazy union - we just queried it for metadata. The lazy union object itself stored a list of tree files and metadata about them, which was kept in a file called a "partition cache".

So what happened if the user did an INSERT? Well, we had a feature to enable this; the extra records would go into (effectively) a CSV file associated with the table, and a reference to that CSV file put into the partition cache for that table. When the CSV got big enough, we could built it into a tree file and start a new CSV file.

As for UPDATE and DELETE: Well, one caveat of our representation was that we never modified tree files. So to UPDATE or DELETE something, we'd create a "delta file" that referenced the tree file we wanted to modify, then contained instructions as to how to modify it (deleting or updating records). We'd replace the reference to the tree in the partition cache with a reference to the delta file; and voila, the system would see changed or removed records.

We used the immutability of tree files to support point-in-time querying. If you set a query timestamp on your database session, the system would effectively roll back the log of changes to the partition cache to restore the table to the state it was in at that point in time, undoing bulk imports, INSERTs, UPDATEs, and DELETEs.

This trick also worked for ALTER TABLE. If you modified the schema of a table, every existing reference to a tree (or a delta file or a CSV or anything else) in the partition cache would be replaced by a reference to the original file plus a schema-change file that explained the change. So the old data would be translated as it was read, with columns being removed, re-typed, added (with a default value), etc. And, of course, if you put a query timestamp on to query the tables as they were in the past, those scheme changes would appear to be undone; deleted columns would return.

When a query came in that looked like SELECT ...some expressions... FROM table WHERE ...some conditions..., we would look the table up in the registry, and get a lazy union object connected to the partition cache file representing the current state of the table. The query planner would "push down" the condition clauses into the lazy union (LU) object, and it would use the metadata in the partition cache to restrict the list of partitions that might match. The main kind of metadata we stored was the minimum and maximum values of every column in each tree file. If our WHERE clause was column BETWEEN a AND b, we could skip any tree whose value range for that column didn't intersect a..b; and many other kinds of WHERE-clause expression could be used to eliminate based on range.

This is why we sorted each batch of data as it came in. Most of the data we stored was logs of something, so each batch covered a fix time range anyway, and so would produce a bunch of trees restricted to a range of some datetime column; so we could query efficiently on date ranges. But if we then sorted by the next most common WHERE-clause constraint column, we would constrain the trees in the batch to each cover a narrow range of that column, and therefore query more efficiently for ranges of values for that column.

But what about restrictions on columns other than event date and the sort column? Other values in the table, unless correlated with one of those columns, tended to end up rather randomly scattered, so the min and max for that column in every partition tended to approximate the min and max for the entire table, and we couldn't eliminate trees in the lazy union using constraints on that column.

Normal databases let you create indexes on columns, but a B-tree index tended to take up space in the order of many bytes per record in the base table. Unless the table had a lot of columns, this means the index is something like a tenth the size of the uncompressed data. As our tree files were about a fortieth the size of the typical data we had, even a single B-tree index would dwarf the original table; and the compression was a big selling point for people. THat wouldn't do. We needed subtler tricks.

So we let users nominate columns they'd like to do equality queries on (field=literal). For each partition, we'd then create a bloom filter of all the values of that column occurring in the partition, and store it in the partition cache, associated with the tree containing that partition. Bloom filters are basically a set, but with a twist: you insert values into the set, then you can query the set to see if a value exists. While a normal set gives you a yes or no answer in this case, a bloom filter gives you a "maybe or no" answer; it's either definitely not in the set or it might be in the set. In exchange for that vagueness, the bloom filter is a lot smaller than a conventional set representation. How big it is is something you can choose - larger ones are less likely to say "maybe" when the answer is really "no" (false positives). In our cases, we went for a 5% false positive rate by default, which gave us bloom filters that consumed five bits per unique value in that column in the tree file; as each tree held about a million records, so would store anywhere between one and a million unique values for a given column, that meant the filters consumed anything from a byte to 600KiB. When a query came in of the form SELECT ... FROM table WHERE bloom_filtered_column=x, we'd scan through all the bloom filters in the partition cache to look for trees that might contain the value x, and then perform the query on those.

And we could get more advanced than that. If users wanted to optimise queries of the form column LIKE '%foo bar%' - looking for a substring in a field - they could create another kind of bloom filter, which only worked on text columns. In this case, we'd go through each value of that column in the tree and extract all the groups of N characters (N might be four or five, typically). So if the field contained I eat food, the groups of four characters would be "I ea", "eat", "eat", "at f", "t fo", "foo", and "food". We'd take all of those for the entire column in the tree, and make a Bloom filter of them. Then if the user asked for a substring of N characters or more - N had to be chosen to be smaller than or equal to shortest expected search substring - we would do the same for the search string and then look in the bloom filters for those. So if the user had column LIKE '%eat food%' in their WHERE clause, we'd split that into "eat", "at f", "t fo", "foo", and "food". Only trees whose bloom filter return a "maybe" for all of those substrings can possibly contain the target string, so we can eliminate any that don't. We had a few other variants on the bloom filter, too, for even more specialist cases.

So, let's recap. I've explained how we physically stored table data, how we imported batches of data, and started to explain how we do simple queries on a single table.

As I was saying, when such a query came in, we'd get this Lazy Union (LU) object that represented all the trees in the table, and we'd tell it to restrict itself to trees that might be pertinent to the query based on the WHERE clause. If this step didn't reduce the list of trees at all, that was fine; what mattered was that we eliminated as many trees as possible to improve performance by considering less data, while never eliminating a tree that might contain records of interest.

Once we had the restricted list of trees in the lazy union, we would be able to generate the parallel query plan. In this simple case, we'd generate a template job that could be applied to each tree file (in this case, this job would open the tree file it was being applied to, request only the columns it needed for the SELECT clause, and then filter them on the WHERE clause to find only the matching records). That template job would be applied to each tree found in the Lazy Union in parallel across the cluster, and the results streamed back to the user.

Join queries were a bit fiddlier. If the user wanted a simple two-table join, such as SELECT ... FROM a, b WHERE a.x = 123 AND b.y = 456 AND a.z = b.z, we would load up two LUs, one for a and one for b, and restrict both by the WHERE clauses that only affected on table: a.x = 123 on a and b.y = 456 on b. Those would limit the numbers of partitions involved, but still, any partition from a could contain rows that match rows from any partition of b - so we would need to consider all possible combinations. This meant running a number of jobs proportional to the product of the size of the tables. So we tried to reduce that in a few ways.

Firstly, once we'd built a restricted lazy union for one table (say, a), we could quickly work out an approximation to the MAX and MIN of the join column, z, in table a. Because we'd already restricted the LU on the a.x = 123 clause and just had partitions matching that, we could just look at the MAX and MIN of z in those partitions (data which we held in the metadata in the partition cache), and then when we built the restricted LU for b, we could restrict it on b.y = 456 AND b.z BETWEEN min AND max, thereby further restricting b. Of course, once we'd done that, we could approximate the min and max of z in b using the same technique, and further restrict the LU for a, and thus get a smaller z-range for a, and further restrict the LU for b - in fact, we could continue this until the range of z stopped shrinking if we wanted to, but in practice, it usually wasn't worth going for more than a couple of iterations.

Also, rather than building a job for every possible combination, we could instead look at each individual partition of table a, find the actual set of values of z in that partition, then restrict a copy of the LU (already restricted as already described) even further to find only matching trees from b to combine with that partition from a.

Some queries don't need to return all the matching records; some queries were just for aggregrates such as COUNT or SUM or MIN or MAX; some queries were GROUP BY queries that would aggregrate in groups; some queries still returned every record but needed to sort them first because of an ORDER BY. In these cases, we still generated per-tree jobs in parallel to find relevant records, but then we'd run a single special job on a single cluster node that would read the results of the parallel phase and do final processing.

There's a final simple case - a query so simple it just needs a single job. SELECT 1, for instance, which needs no table data from tree files. Or queries that can be fulfilled purely from the tree metadata in the partition cache, so again didn't need to read any tree files.

But what about more complex queries? Joins of three or more tables? Queries with subqueries? Common table expressions with a WITH..AS? Well, we handled all of them by splitting them up into a directed acyclic graph of simple "sub-queries", which operated on a combination of actual "base" tables and the results of other queries; a single final sub-query would produce the final results of the query. In fact, internally, we converted the entire query into a big WITH..AS with each query being one of the above cases we already know how to handle.

We would run the subqueries in parallel, as soon as all the tables they needed existed. To begin with, only the subqueries that depended purely on base tables would be able to run, but the results of subqueries would be built into temporary tables, in a special registry namespace local to that query. As subqueries finished, other subqueries would become runnable because their input tables would exist. We'd run subqueries in parallel across the cluster, which was pretty cool. Eventually, the final query would become runnable; we'd run it and send the results to the user and be done.

So a query that joined tables a,b,c and d could join a and b in parallel with joining c and d, then join the results of those two.

But with all this parallelism going on - different subqueries running different parallel jobs at the same time, all competing for cluster resources with each other and other queries in the system - analysing what happened during a query could have been difficult. Just writing logs on each cluster node wasn't really enough. So each query was given a cluster-wide unique ID when it started, which was passed down to every subprocess involved in executing it. Each process within a subquery had a role name unique within the subquery, assigned as part of the query plan; so a process could be identified by the query ID, the subquery name, and the process role name. So we logged events into a per-process log file on the node the process ran on, identified by the process' unique identifier. Once the query was done, we'd either make every node delete all log files for the query ID if nothing went wrong; but if the query failed, or if a trace was explicitly requested, we'd gather all the log files for the query together onto one node and consolidate them into a "trace file".

This file was an sqlite database, containing all the log events in a highly structured form that made it easy to pull out only events pertaining to particular parts of the query. We also stored high-resolution timestamps so we could extract precise timings for performance analysis. The log events fell into three groups: Traditional log strings as you'd find in any UNIXy log file, machine-readable data items with a key and a value, and phase start/stop events. The latter kind broke the query down into phases, ranging from high-level stuff such as entire subqueries and the planning versus execution phases of subqueries down to fine-grained parts of individual jobs: opening tree files, performing various algorithms, and so on. These served two purposes: they gave other log events a context as to what the system was doing when they happened, and they let us see where the time was spent in a query for performance reasons. The machine-readable data items were generated to record various interesting properties (one example was that every process logged its rusage counters at the end of execution - CPU seconds consumed, peak memory size, number of IO reads and writes, that sort of thing). We recorded a lot of interesting things, such as the peak parallelism of subquery execution.

Because of all the parallelism, phases overlapped with each other; so I took great pleasure in writing a tool that would take a query trace file and produce a Gantt-style chart showing how the time within it broke down. This thing was very popular; support staff could ask users to send them the trace from a failed query, or request a trace for a poorly-performing query, and then we could dissect the trace file to work out what happened. Also, they were popular with the test team; a query that was carefully written to test a particular algorithm in the system could check that the algorithm was actually used by looking in the query trace. As a future change to the query optimiser might alter how queries executed, it would otherwise have been quite common for a test that checked we got correct results in a situation where a certain algorithm was used to no longer actually trigger that algorithm, meaning a part of the system would be left untested. Logging the peak parallelism of subquery execution as a machine-readable data item meant that the various limitations on maximum parallelism could be tested; and tests that were meant to check that everything worked correctly at high levels of parallelism could check that the desired parallelism was actually achieved and some other bottleneck in the system didn't prevent it.

It was a very different world, working on that system compared to the low-latency database I covered in the previous blog post. Which, me being me, just makes me think about ways in which the two approaches could be merged into a database capable of supporting both kinds of workload...

The Hackspace Grand Opening (by )

Those of you who follow me on various social media will have seen the photos of Cheltenham Hackspace new premises being spruced up and that is because tomorrow (well today now!) is the grand opening 😀

Sunday 4th of December - if you are local then please pop along - hackspaces or createspaces are made by and for the community and we'd love to meet you 🙂

I'll be taking along steam punk and textile stuff to be working on and have plenty of spare so others can have a go! There is plenty to see and people to talk to and skill share with 🙂

Also I have been making craft videos for Advent as it turns out it is the tenth Christmas of Salaric Crafts How To Blog!!!

Of course I only have 2 vids up and it is now technically day 4 - annoyingly obsolete laptop is obsolete and is being a pain in the backside but I shall continue limping along with it and hopefully get the other two vids to you some time tomorrow!

I also have a craft fayre Monday at the Costa Coffee on Metz Way in Gloucester 🙂 Mainly taking Wiggly Pet Press stuff with a bit of Steam Punk etc...

Cool things I have worked on: Low-latency highly-available NoSQL data store (by )

I've worked on a bunch of cool things in the past; and since I'm always explaining them to people, I realised it'd be good if I just blogged about them so I can just share the link.

I was asked to help design and then build a database to back a social networking system. The main requirement was to be able to satisfy lots of single-record primary-key fetches from a single thread as quickly as possible, because a PHP web app would be requesting lots of data to build the page; we had to do a few hundred of these "random point queries" in a small fraction of a second to get the page rendered in time. It had to be able to scale sideways, by adding more machines to the cluster. It also needed to be able to update this data, but at a much lower load - updates and creations of single records happened when a user went and did them, but people browsing around the site would hit several pages, each of which would request hundreds of records.

And it needed to be highly available. The app was to be spread over two datacentres for redundancy purposes, and as long as at least one server was reachable, the whole thing had to keep working as much as possible. We couldn't use "quorum" systems where a majority of the nodes needed to be reachable to make an update, for instance.

This, combined with the projected dataset size being a few terabytes at most, meant we decided to go for full replication. Every node had a copy of the database, and rather than being a daemon you spoke to over a socket like a conventional database, we'd write a shared library that would be loaded into the PHP process, so as to avoid context switch overheads for each query. The shared library would read the data files directly from disk.

The on-disk format was B-trees, but with a multi version concurrency control setup: update transactions (more on those later) would write all the new data into unused space until it was ready, so reads could still be happening while an update was being prepared, so as not to delay those all-important reads.

Updates were done with a reliable multicast protocol. The shared library would broadcast up dates to all nodes currently reachable from the process sending the update. Those updates were received by a daemon process running on every node, which took updates from the network and applied them to the local disk storage. But there were many devils in the details here.

Because every daemon had to process the entire update load of the system in parallel, they had to try and maximise the use of disk bandwidth. We couldn't afford to be rushing around updating every record with its own transaction, with all the sync overheads; we had to batch them somehow to amortize the overheads of committing them to disk reliably. Also, as updates were handled asynchronously and sent from a large pool of PHP processes, we needed to stop the PHP application from sending updates faster than we could process them and overrunning the network buffers. And just to make matters worse, updates could come from the network in unusual orders for various reasons, but we needed to apply them in a consistent order on every node so multiple updates to the same record all produced the same result in the end.

So, every daemon had an update queue. Its highest priority was pulling updates off the network and into the queue; its second priority was processing updates from the queue. But the queue wasn't just a FIFO. We gave every update a sequence number when it was generated, from a sequence number counter in shared memory on every node; and to ensure global consistency, we implemented Lamport timestamps - when an update was received from another node, we immediately updated the local sequence number counter to make sure it was more than the received sequence number, so that all updates issued from a node with knowledge of a given update would carry later timestamps, thereby ensuring that any transaction which read a current value and then updated that to a new value would be applied "later" than the update that provided the old current value.

So our queue ordered incoming updates by sequence number (using the ID of the issuing node as a tie breaker, to ensure a global ordering). We also stored the update sequence number on every record on disk, and refused to update a record to an "older" value, to deal with re-orderings over a longer time period than those which met in the queue. The queue also noticed updates to the same record group and merged them into a single update, always replacing older data with newer according to the sequence numbers. This meant that a flurry of updates to the same record would, as long as they all arrived before the queue was flushed, all "coalesce" into a single update to the on-disk data.

Wait, I said "record group"; what's that? Well, in the data model, often a bunch of records would represent parts of one object. Imagine an invoice in the traditional SQL format, with an invoice record and a bunch of line item records that are components of it. We encouraged the use of small records, as we updated records by providing a new value for the whole thing, and lots of small records meant that finer-grained bits of a record can be updated in parallel without trashing each other's updates. So as part of the "schema" we defined a mapping from record primary keys to record groups; many records would go in the same group, and was stored as a single entry in the on-disk B-tree, so we wanted to coalesce updates to the same record group into one, thereby getting more records updated in a single read-modify-write cycle.

Under the hood, the queue was two data structures - a priority heap full of record-group update lists, ordered by sequence number; and a hash table from record-group IDs to the same update lists. Incoming updates were mapped to record-group IDs and looked up in the hash; if an update to that group already existed then the update would be merged into it, and if the sequence number was older than the one already in that queue entry, we'd move the entry up the queue by re-inserting it into the priority heap. Otherwise, we'd create a new entry with a single update to the record group, and pop it in the hash and the heap. Keeping the two in synch in all cases was a bit of work, but once we'd covered all the cases, it looked after itself.

The queue also helped us prevent overload, too. Every node measured how big the backlog was, in three different ways: There was a maximum number of record group updates we wanted to hold in the queue, a maximum number of bytes of new record data in the queue, and a maximum average latency (in seconds, between the PHP code requesting the update and it getting written to disk on the node). We tracked the latter by putting wall-clock timestamps in the update messages and keeping an exponentially-weighted moving average of the difference (like a UNIX load average), and the former two came directly from the queue data structure. On each node, we compared each of the three measurables to our target and scaled it to a 0-100% capacity measure; we then took the worst of the three as the backlog score for that node, a measure of how "far behind" it was.

These were broadcast to the other nodes in the network, to compute an overall "worst backlog of any node" for the entire cluster. We were a bit clever with this - if a node's backlog wasn't the worst, then there was no point in it telling any other nodes about it, so only the nodes that were at the worse backlog level in the system were broadcasting their state. Again, there were lots of edge cases in this, which we carefully handled to make sure the system as a whole knew the worst backlog state of any node in the system, with the minimum of network noise.

So each node now knew how backlogged the cluster as a whole was; and we used that information to apply back pressure to the PHP clients. The backlog score was stored in the shared memory segment, and the function that was called from PHP to send an update consulted it. If the backlog was more than a minimum, then gentle pressure was applied to slow the PHP application down - before sending the message, we would sleep for a duration that depended upon the backlog level. If it was at or more than 100%, we'd actually sleep in a loop until it abated; but otherwise, we'd sleep for a time that increased smoothly from zero to infinity as the backlog went from 0% to 100% (accounting for the situation where the backlog dropped while we were sleeping, to a point where we'd already slept enough). This meant that if the PHP app was trying to generate writes faster than we could get them to disk, there'd be a certain amount of "stretchiness" in the system as the implicit queue in the network and our update queue took up the slack. But if the update queue kept growing, the PHP app would face increasing delays in getting updates applied. This created a negative feedback loop (as there was a finite limit to how many PHP processes could exist across the cluster), and we tuned the shape of the feedback curve such that the system would settle to a stable backlog level (and, thus, a stable sleep duration, and thus a stable write rate) rather than oscillating.

This meant that when the PHP code did an update, it would be some time before that update was visible on every node. For some parts of the system, that was fine, but for other parts, an immediate response to updates was required, so we gave the caller of the read function a choice of "consistency level". At the most basic and fastest level, it would just check the local state, and potentially miss out on updates that were still flying through the network. At the "immediate" level, however, it would first hash the record key and use that to select a "primary node" for that record. That node would be responsible for storing, in-memory, the latest state of that record if it was currently being updated; because the update function also offered a consistency level setting. The most basic one just send the update out onto the network, and the "immediate" level first sent the update to the primary node's in-memory update buffer. The use of specific nodes for this threatened our reliability guarantee; if that node was unavailable, or crashed and restarted while an update was in progress, we'd lose the update from the buffer - but it would still proceed through the normal mechanism. So if updates and reads to a record both requested "immediate" consistency, we would not see update latency (except when nodes failed), but at the cost of network round trips harming the latency.

Updates had a third consistency level, too, which was meant for creating new records with an external primary key (such as a user record, keyed by email address): if this was selected, then a two-phase commit process was used. The record creation would be proposed on every reachable node, which would cause that primary key value to be marked as reserved in the database, or the proposal would be rejected if it already existed or was already reserved. If a majority of the nodes in the system were reachable and OK with the update, then it would be actually performed; otherwise, it would be rejected. These were slow, and we tried to avoid them, but some parts of the application required that level of atomic global consensus.

You might be worrying that these asynchronous updates could get lost between a PHP process sending them to the network and at least one node receiving them, but all updates were received by the local node intrinsically; and once an update is on one node, it'll get to all the others. The reliable multicast protocol lets each node know if it's missed some updates from others (by using a per-node monotonic sequence number on every message, and looking for gaps). If that happens, it would try and retransmit if they were still in the send buffers; if not, then a network break would be declared, and the nodes would know they'd missed some updates from some other nodes. This would start a process by which all nodes that had missed some updates would declare the range of global sequence numbers they seemed to be missing, and the nodes that had updates in that range would then re-transmit them all (by using a special index from update sequence number to record across all tables). They would be retransmitted with the original sequence numbers, not new ones, so no matter what order which nodes got them in, they would eventually get to all nodes and end up in the same global ordering, so every node would end up with the same final state.

A node that starts up from a reboot or from fresh used the same process - the knew the last sequence numbers they'd been fully in synch with the cluster at, so they'd be able to request the latest state of all records updated since then. A totally new node would have seen no updates, so would request all records changed since timestamp zero, which would be all records. In these cases, the node would request that the playback be unicast to it alone, as opposed to the network partition case where nodes would cooperate to request broadcasts of all the missed updates to get the whole network back together.

And so we had a system that could (on my 2008-era laptop in a VM) do several thousand random record reads per second from a single thread, while handling bursty asynchronous updates of a thousand or so records per second (many many more if they all happened to fall into a smaller number of record groups!), while nodes came and went from the cluster, and the cluster was occasionally split into arbitrary sub-clusters due to network link failures. When the cluster was working properly, it would provide a completely consistent view of the database; but when the cluster was in a failure state, all nodes could service reads and updates (except the super-consistent unique primary key creation updates, which would all fail if a quorum of nodes couldn't be contacted to agree), but updates could then be delayed. I've not seen a system since that can provide that level of service.

Programming with state machines (by )

State machines are a really nice way of thinking about systems with state. As a notation for expressing imperative code, I much prefer this:

To this:

x = readchar();
while(x != EOF) {
   if(x == 'a') {
      x = readchar();
      while(x != EOF && x != 'x') {
        x = readchar();
   } else if (x == 'b') {
      x = readchar();
      while(x != EOF && x != 'x') {
        x = readchar();
   } else {
      x = readchar();

I've always wished I could write that sort of code as a state machine rather than a bunch of nested structured programming primitives. Although there is some charm to the idea of drawing a diagram on the computer and having that be compiled as executable code, though, that can also be fiddly and isn't well supported by conventional programming tools; so I'd rather actually describe my state machine in text. What I want is a syntax for writing state machines, rather than writing imperative code that implements the state machine.

For instance, the diagram I put above wasn't drawn by me; it was drawn by a tool called PlantUML from the following source code:


state Processing {
      Ready --> A : a/doA1
      Ready --> B : b/doB1
      Ready --> Ready : not a or b/doC

      A --> A : not x/doA2
      A --> Ready : x/doA3

      B --> B : not x/doB2
      B --> Ready : x/doB3

[*] --> Ready

Processing --> [*] : EOF


But the UML state diagram is meant for abstract modelling of systems, and lacks the precision and detail required for actual executable code. The state diagram doesn't make it clear that doA2 and doB2 need to be passed the character that was read, for instance. PlantUML's syntax, therefore, also lacks that kind of detail, so we can't just use that.

Also, in a programming situation, it would be nice to be able to express types of state machines: to be able to say that a group of state machines has some useful properties in common, defined by the "type". The type of a state machine defines some high-level states and what inputs or outputs are possible in each state; any state machine of that type has to follow that behaviour, even though it might break the states defined in the type into many substates. For instance, the example state machine above always accepts any character until it receives an EOF, then stops; that might be expressed as a type having a single Ready state with two transitions, one mapping an EOF to the end state and another mapping a character back to the Ready state, with no outputs. That type would be suitable for any state machine that accepts characters until they finish, so a generic "character input stream" might accept a state machine of that type to process them, without needing to know anything about As and Bs.

These thoughts have led me to the following model of a state machine that's suitable for use as part of a programming language (although I've not defined an actual syntax yet):

State machine type

A state machine type is a set of states. Each state has a name, and a list of message types which are acceptable in that state (I am assuming that the underlying type system is smart enough to include things like "character equal to a" as a type, rather than merely "character"). This list must be disjoint.

For each input message type, there's a list of possible next states and an output message type (which in our example above would be a void type, as we never output messages).

There is also an initial state. It's impossible for the initial state to be the target of a state transition on a message, so it only has transitions out to other states. Those transitions all represent possible constructors of this state machine, because the initial state represents nonexistance.

For instance, a type" for vending machine controller state machines might be (given a Result type which has two values, success and fail):

Initial state:
   input: (Create) output: (Void) next state: (Idle)

Idle state:
   input: (Coin(Value)) output: (Value) next state: (Credit)

Credit state:
   input: (Coin(Value)) output: (Value) next state: (Credit)
   input: (Refund) output: (Value) next state: (Idle)
   input: (Select(Product)) output: (success::Result) next state: (Idle)
   input: (Select(Product)) output: (fail::Result) next state: (Credit)

The Value objects output in response to Coin or Refund inputs happen to reflect the current credit in the machine, a fact which the state machine type alone can't represent (that would need to be explained in a design-by-contract invariant of some kind).

From this type can be derived the type of the state machine's transition functions, which in this case will be:

coin :: Idle -> Value -> (Credit,Value) |
        Credit -> Value -> (Credit,Value)
refund :: Credit -> (Idle,Value)
select :: Credit -> Product -> (success::Result,Idle)|(fail::Result,Credit)

Note that the Create transition isn't represented here, because this state machine can't actually be instantiated - it doesn't specify the behaviour enough. There's no way of saying which possible select result should happen, or of what values are returned as outputs. But the functions we have are automatically implemented by the state transition definition, and will cause the appropriate transitions to occur in any state machine that's a subtype of this one; every function accepts an instance of the state machine in a given state, and an input value, and returns a new instance of the state machine and the output value.


State machine types can be subtypes of other state machine types. A state in the parent type may be split into several states in the subtype; any transition to a state A in the parent type must be represented by a transition in the subtype to some substate of A, and all transitions possible from state A in the parent type must be represented by transitions from every substate of A in the child.

For instance, a subtype of the vending state machine might have an infinite number of substates of "Idle", one for each possible configuration of available things to vend inside the machine. If it vends three different products, then these substates might be called Idle(A,B,C) where A,B and C are nonnegative integers representing the stock of each item. The only transition out of "Idle" is when a coin is inserted, so every Idle(A,B,C) state must have a transition from a coin being inserted, returning a Value, and taking us into the Credit state.

However, our Credit state also tracks the stock, and also tracks the credit inserted so far - so we need an infinite set of states of the form Credit(A,B,C,X) where A,B,C are the stocks and X is another nonnegative integer for the number of pennies of credit. So the parent-type transition from Idle to Credit has to be replaced by a transition from every Idle(A,B,C) to Credit(A,B,C,X) where X is the Value of the coin inserted. All the transitions from Credit in the parent type need to be represented as transitions from every Credit(A,B,C,X) state to other Credit(A,B,C,X) or Idle(A,B,C) types as appropriate. But with that done, it can be shown that all the states and transitions of the parent type correspond to one or more in the subtype.

The one exception is the initial state - the creation transitions from that in the subtype need not correspond to those of the parent type.

Given an extension to the syntax to create "parameterised states" such as Idle(A,B,C), which are shorthand for a potentially infinite set of "concrete states" such as "Idle(4,0,1)", we might represent that subtype like so:

Initial state:
   input: (Create(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger))
     output: (Void) next state: (Idle(A,B,C))

   state implements Idle:
   input: (Coin(x::Value)) output: (x::Value) next state: (Credit(a,b,c,x))

   state implements Credit:
   input: (Coin(x'::Value)) output: ((x+x')::Value) next state: (Credit(a,b,c,x+x'))
   input: (Refund) output: (x::Value) next state: (Idle(a,b,c))

   input: (Select(Product==A)) when: a>0 output: (Success) next state: (Idle(a-1,b,c))
   input: (Select(Product==A)) when: a==0 output: (Fail) next state: (Credit(a,b,c,x))

   input: (Select(Product==B)) when: b>0 output: (Success) next state: (Idle(a,b-1,c))
   input: (Select(Product==B)) when: b==0 output: (Fail) next state: (Credit(a,b,c,x))

   input: (Select(Product==C)) when: c>0 output: (Success) next state: (Idle(a,b,c-1))
   input: (Select(Product==C)) when: c==0 output: (Fail) next state: (Credit(a,b,c,x))

(We really should have represented the stock as a Map(Product,NonNegativeInteger) type and then not duplicated the select rules...)

Converting a parent state into a parameterised subtype state is only one way of splitting a state in a subtype, too. The syntax above would permit the creation of separate states that all say "implements Credit", too. A very simple vending machine that can only hold one instance of one product might split Idle into two states, Empty and Full, for instance; and probably split Credit into CreditFull(Value) and CreditEmpty(Value) to represent the Credit state while also parameterising CreditFull and CreditEmpty with the current credit.

The above syntax for parameterised states can get unweildy if there's a lot of parameters, especially if generally only a few of them are changed in any given transition (imagine the above example of there were ten different products to keep track of). Therefore, as syntactic sugar, it makes sense for it to be possible to define parameters shared implicitly by one or more states. Transitions into those states from states outside the group need to explicitly specify all the parameter values, but transitions within that group can use a different syntax to represent the next state, which only specifies what parameters have changed. All the rest are passed in unchanged, automatically. It might look like this:

Initial state:
   input: (Create(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger))
    output: (Void)
    next state: (Idle(A,B,C))

parameters a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger {

Idle(a,b,c) state implements Idle:
   input: (Coin(x'::Value)) output: (x'::Value) next state: (Credit(x <- x'))

Credit(a,b,c,x::NonNegativeInteger) state implements Credit:
   input: (Coin(x'::Value)) output: ((x+x')::Value) next state: (Credit(x <- x+x'))
   input: (Refund) output: (x::Value) next state: (Idle())

   input: (Select(Product==A)) when: a>0 output: (Success) next state: (Idle(a <- a-1))
   input: (Select(Product==A)) when: a==0 output: (Fail) next state: (Credit())

   input: (Select(Product==B)) when: b>0 output: (Success) next state: (Idle(b <- b-1))
   input: (Select(Product==B)) when: b==0 output: (Fail) next state: (Credit())

   input: (Select(Product==C)) when: c>0 output: (Success) next state: (Idle(c <- c-1))
   input: (Select(Product==C)) when: c==0 output: (Fail) next state: (Credit())

Implementing state machine types

Note that we've suddenly started giving names to the Values flying around, and specifying an output value rather than just a type, and splitting transitions into different cases with disjoint boolean "when:" expressions. This, too, further constrains the type of the state machine; indeed, this machine can actually be implemented and will provide a simple model of a vending machine - although nothing will be vended.

This means that this new state machine's state transition functions will include a "create" function from the initial stock levels to an instance of Idle(NonNegativeInteger,NonNegativeInteger,NonNegativeInteger).

A real implementation could further subtype this, adding a uniquely-typed IO interface to the Idle and Credit states, passed into the create transition. The Success-returning select operations can then also use pure IO functions to mutate the IO context to one in which the physical mechanism of vending has happened.

But the implementation of a state machine exists in the expressions that return the output value, parameterise the next state, and the boolean expressions forming the "when:" guards. These are full expressions in the pure functional language I'm imagining this thing implemented on top of, so are fully Turing complete.

There is no syntactic distinction between a type and an implementation; a state machine type can be instantiated if all of its outputs have expressions giving them values (ok, singleton-typed outputs such as Void can be implied!). This is a bit like concrete versus abstract classes in an object-oriented programming language.

Chaining transitions

So far, we've only looked at transitions that accept an input value, return an output value and move to a new state. This means that state machine behaviour is defined entirely in terms of its reactions to external values. Since the values we generate for the output and the parameters of the new state are Turing-complete expressions, that is sufficient to describe any behaviour, but it's not always convenient to express complicated behaviour in an expression. Sometimes a complicated operation of a compound data structure such as a list would be better defined as a state machine in its own right.

In order to support that kind of thing, we can also support chaining transitions, which can drive state machine behaviour purely based on internal state, and so do not cause the generation of state transition functions. These come in three types.

The first kind is one that starts with an input value, but which then leads to a next state without producing a value. This next state must be a special "transient state", the rules for which are explained below.

The second kind is one from a transient state to another transient state. This has no input value and no output value.

The third kind is a transition from a transient state to a non-transient state, which produces an output value.

Transient states may not have any non-chaining transitions point to and from them. Every transient state must be referenced from at least one non-transient state through a chain of chaining transitions, starting with a kind-1 chaining transition then zero or more kind-2 chaining transitions. Every transient state must also lead to at least one non-transient state through a chain of zero or more kind-2 chaining transitions, then a kind-3 chaining transition.

Every such possible chain from a kind-1, through zero or more kind-2, to a final kind-3, is equivalent to a single transition from the initial to the final non-transient states; with the kind-1 transition defining the input value and the kind-3 transition defining the output value. They might be implemented as a set of state-transition functions that just tail-call each other, but the transient states are therefore "hidden" inside the state machine from external perspectives. Those state-transition functions are lexically local to the exposed state-transition functions implementing the equivalent non-chaining transition.

But although they are hidden from users of the state machine, a subtype of the state machine can indeed "see" them for purposes of splitting them into substates, adding transitions between them, etc.


The final thing I've not covered above is terminating state machines. There's an implicit "final" state available in every state machine, if at least one transition to it exists; no transitions from that state can exist. Any transition to the final state appears in the state transition functions as a function that returns the output value, but not paired with a new state machine instance, because there isn't one. It's gone.

It is entirely possible for a state machine to have no non-transient states apart from the initial and final states. For instance, a state machine that traverses a tree and sums up the labels on the nodes might be implemented in terms of transient parameterised states for the current position in the tree and the running total, with an kind-1 creation transition that accepts the tree as an input and a kind-3 finalisation transition that returns the count. This machine will therefore have a single state transition function - from Tree to Number. The actual state of the machine is entirely represented as state parameters passed between mutually recursive functions implementing the chaining transitions, and never appears as a value visible outside of that.


I think I've defined a workable semantic model for state machines that can be used as a way to define sets of pure functions between instances of state machines, that can handle both synchronous state machines driven by external events, and those that bumble around a data structure or iterate mathematical operations.

It needs a proper syntax; personally, I'd prefer one based on s-expressions or IRON, but suitably consistent forms could be defined for any language.

Towards the Family Mainframe (by )

Last September, I posted progress on the construction of our domestic mainframe. To recap, the intent is to build a dedicated home server that's as awesome as possible - meaning it's reliable, safe, and easy to maintain. That rules out "desktop tower PC in a cupboard" (accumulates dust bunnies, gets too hot, easily stolen, prone to children poking it); "put a 19" rack somewhere in your house" is better, but consumes a lot of floor footprint and doesn't fix the dust bunny problem. So I've made my own custom steel chassis; fed cold air at pressure via a filter, incorporating a dedicated battery backup system, locked and anchored to the wall, and with lots of room inside for expansion and maintenance.

Since that blog post, I've finished the metalwork, painted it with automotive paint using a spray gun (which was a massive job in itself!), fixed it to the wall, and fitted nearly all of the electronics into it.

A significant delay was caused by the motherboard not working. I sent it back to the shop, and they said it was fine; so I sent the CPU back, and they said THAT was fine; so I sent both back together and it turned out that the two of them weren't compatible in some way that was solved by the motherboard manufacturer re-flashing my BIOS. That's now up and running; I was able to use the HDMI and USB ports on the outside of the chassis to connect up and install NetBSD from a USB stick, then connected it to the network and installed Xen so I can run all my services in virtual machines. It's now running fine and everything else can be done via SSH, but the HDMI and USB ports are there so I can do console administration in future without having to open the case (unless I need to press the reset button, which is inside).

The one thing it's lacking is the management microprocessor. I've prototype this thing on a breadboard and written the software, but need to finish off the PCB and cabling: but it will have an AVR controlling three 10mm RGB LEDs on the front panel, and three temperature/humidity sensors in the inlet and outlet air (and one spare for more advanced air management in future). But the idea is that the three LEDs on the front panel will display useful system status, and the environment sensor data will be logged.

Here's what it looks like from the outside; note the air inlet hose at the top left:

Family mainframe

The socket panel on the left hand side worked out pretty well - 240v inlet at the bottom, then on the aluminium panel, three Ethernets, HDMI, and USB (my console cable is still plugged into the HDMI and USB in the photo, which won't usually be the case):

I/O sockets panel and the power inlet

And here's the inside, with lots of space for more disks or other extra hardware; the big black box at the bottom is the battery backup system:

Innards of the family mainframe

Now I have Xen installed, I'm working on a means of building VMs from scripts, so any VM's disk image can be rebuilt on demand. This will make it easy for me to upgrade; any data that needs keeping will be mounted from a separate disk partition, so the boot disk images of the VMs themselves are "disposable" and entirely created by the script (the one slightly tricky thing being the password file in /etc/). This will make upgrades safe and easy - I can tinker with a build script for a new version of a VM, testing it out and destroying the VMs when I'm done, and then when it's good, remount the live data partition onto it and then point the relevant IP address at it. If the upgrade goes bad, I can roll it back by resurrecting the old VM, which I'll only delete when I'm happy with its replacement. This is the kind of thing NixOS does; but that's for Linux rather than NetBSD, so I'm rolling my own that's a little more basic (in that it builds entire VM filesystems from a script, rather than individual packages, with all the complexities of coupling them together nicely).

I'm using NetBSD's excellent logical volume manager to make it easy to manage those partitions across the four disks. There are two volume groups, each containing two physical disks, so I can arrange for important data to be mirrored across different physical disks (not in the RAID sense, which the LVM can do for me, but in the sense of having a live nightly snapshot of things on separate disks, ready to be hot-swapped in if required). I still have SATA ports and physical bays free for more disks, and the LVM will allow me to add them to the volume groups as required, so I can expand the disk space without major downtime.

So for now it's just a matter of making VMs and migrating existing services onto them, then I can take down the noisy, struggling, cranky old servers in the lounge! This project has been a lot of work - but when I ssh into it from inside the house (over the cabling I put in between the house and the workshop) and see all that disk space free in the LVM and all the RAM waiting to be assigned to domU VMs that I can migrate my current services to, it's all worth it!

WordPress Themes

Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales
Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales