Cool things I have worked on: Clustered analytic database (by )

In my last blog post, I talked about how I was involved in building a database optimised for low latency on single-record reads.

So it was a bit of a change to later work on a database optimised for high throughput on bulk querying! Rather than getting single records, it was all about finding all the records that match some criterion and doing something with them as fast as possible, where "fast" is measured in "records per second". When I started there, the minimum time to perform a trivial query (eg, SELECT 1) was about three seconds, let alone any that fetched any data. However, on the right hardware, it could process untold numbers of gigabytes of data per second once it had gotten going. That three-second minimal round trip time was negligible when dealing with queries that processed hundreds of terabytes of data in ten minutes. That said, one of my first projects was to fix some low-hanging fruit, which brought the minimal round trip time down to around 300 milliseconds.

Unlike the previous database, this one had an SQL parser; rather than an API to get and set records, you could enter SQL queries in all their glory, and get back result sets.

You didn't generally INSERT, UPDATE or DELETE individual records (although that was possible, it just didn't perform well, as will be explained later). Generally, data came into the system through CSV or BCP files that were plopped into the filesystem. The system would sort them on a chosen key (why? You'll see), then split into partitions of something like a million records. These partitions were all compressed into "tree files". And because each partition was compressed independently, these jobs could be spread over the cluster.

The compression algorithm was interesting. If you put data in a CSV and gzip it, you'll knock it down to a quarter or so of its size, depending on the data. Our algorithm usually got a factor of ten better compression. This was a big deal for our customers as they had to store many petabytes of data, and dividing the size by a factor of forty meant spending a fortieth of the money on immense disk arrays. Also, most of our query performance was limited by how fast you could read data from disk what with modern CPUs being so good; so compressing things by a factor of forty meant forty times the bulk throughput.

It worked by looking at each column and working out a set of unique values present in it, and sorting them. These columns could then be compressed with fairly conventional techniques for storing sorted lists of things (such as storing each element as the difference from the previous element, and then using conventional algorithms such as deflate on the result).

So if we had the following data:

MakeModelColourQuantity
FordMondeoWhite7
FordMondeoBlue6
FordTransitWhite7
FordTransitRed7
SkodaFabiaWhite4

We would store it as a bunch of columns:

  • Make: Ford,Skoda
  • Model: Fabia,Mondeo,Transit
  • Colour: Blue,Red,White
  • Quantity: 4,6,7

But you still need to know how to reconstruct the records from those values. Each record could be thought of as a vector of integer indices, each of which was an index into the sorted values for that column. If you just stored those records-of-indices, you'd be able to reconstruct the records and would probably have gained some good compression.

If we start the indices at 0 for the first value in each column, our original data might now look like this, referring back to the sorted columns we stored:

MakeModelColourQuantity
0122
0101
0222
0212
1020

But we did more than that. We didn't actually write those records-of-indices to disk. The compression engine took pairs of columns in the records-of-indices representation, and again deduplicated and sorted each of them.

Which pairs you took mattered greatly to how good a compression you got, in subtle ways, but we tried to get related columns. Make and Model are connected, so we'd do well to pick that one first. Each model only ever comes from a single manufacturer in our example, so the combined (make,model) columns only have three values - (0,1), (0,2) and (1,0). So we could store those in a list we'd call an "index set", which actually went onto disk, deduplicated and sorted and compressed like a normal column:

  • Make+Model: (0,1), (0,2), (1,0)

Now the table (in memory) looks like this:

Make+ModelColourQuantity
022
001
122
112
320

Again, we pick a pair of columns. There's no obvious choice any more, so I don't know what our default heuristic would actually do, but let's say we decided to join Make+Model with quantity. The pairings we find are (0,2), (0,1), (1,2) which happens twice, and (3,0), so we'd store:

  • (Make+Model)+Quantity: (0,1), (0,2), (1,2), (3,0)

(not forgetting that the sorted list of values is difference-encoded and compressed, rather than written literally like that).

Our table in memory now has only two columns:

(Make+Model)+QuantityColour
12
00
22
21
32

At this point, we can't continue any further. Each entry in that table corresponds to a record in the original data, and by looking up the index sets we can expand out the columns to recreate the original table in its entirety; so we'd sort that final two-column table and store it (difference-encoded and compressed, of course). Our final entry in the file, the "root indexset", would be:

  • root: (0,0), (1,2), (2,1), (2,2), (3,2)

But we don't need to decompress the entire tree. Unlike a "gzipped CSV", if we just want some of the columns of the table, we can not bother traversing the index set tree to extract every column. If we just wanted the colour for every car in the table, we'd read the root indexset, and the colour valueset so we could look up the colour values, and we'd be done. If we wanted just the makes, we'd need to read the root indexset, the make+model+quantity indexset, the make+model indexset, and the make valueset.

However, if we wanted MIN(Make), MAX(Make) or did a SELECT DISTINCT Make, then we don't even need the root set - we can just open the Make valueset and take the first or last value, or all values.

Anyway, tree files, once built, were given unique partition IDs and put into some kind of storage. We didn't really care what the storage was, and supported several different backends - anything that let us save a file with a name, and get the file back with the same name, would do nicely. We supported raw filesystems (over NFS or on a clustered filesystem so all the servers in the cluster could see it), HDFS, and various enterprise storage devices.

So how did we know what tree files made up each table? Well, a data structure called the registry mapped table names to table objects. We supported a range of different types of table objects, but the most common one was called a "Lazy Union", which represented a UNION ALL in the SQL sense - a bunch of tables concatenated together. It was lazy because we used it to union together all the tree files that made up a table, but the system avoided actually trying to load all the tree files and UNION ALL them together (as that would generally be many petabytes of data); in fact, we basically never actually read any tree files in a lazy union - we just queried it for metadata. The lazy union object itself stored a list of tree files and metadata about them, which was kept in a file called a "partition cache".

So what happened if the user did an INSERT? Well, we had a feature to enable this; the extra records would go into (effectively) a CSV file associated with the table, and a reference to that CSV file put into the partition cache for that table. When the CSV got big enough, we could built it into a tree file and start a new CSV file.

As for UPDATE and DELETE: Well, one caveat of our representation was that we never modified tree files. So to UPDATE or DELETE something, we'd create a "delta file" that referenced the tree file we wanted to modify, then contained instructions as to how to modify it (deleting or updating records). We'd replace the reference to the tree in the partition cache with a reference to the delta file; and voila, the system would see changed or removed records.

We used the immutability of tree files to support point-in-time querying. If you set a query timestamp on your database session, the system would effectively roll back the log of changes to the partition cache to restore the table to the state it was in at that point in time, undoing bulk imports, INSERTs, UPDATEs, and DELETEs.

This trick also worked for ALTER TABLE. If you modified the schema of a table, every existing reference to a tree (or a delta file or a CSV or anything else) in the partition cache would be replaced by a reference to the original file plus a schema-change file that explained the change. So the old data would be translated as it was read, with columns being removed, re-typed, added (with a default value), etc. And, of course, if you put a query timestamp on to query the tables as they were in the past, those scheme changes would appear to be undone; deleted columns would return.

When a query came in that looked like SELECT ...some expressions... FROM table WHERE ...some conditions..., we would look the table up in the registry, and get a lazy union object connected to the partition cache file representing the current state of the table. The query planner would "push down" the condition clauses into the lazy union (LU) object, and it would use the metadata in the partition cache to restrict the list of partitions that might match. The main kind of metadata we stored was the minimum and maximum values of every column in each tree file. If our WHERE clause was column BETWEEN a AND b, we could skip any tree whose value range for that column didn't intersect a..b; and many other kinds of WHERE-clause expression could be used to eliminate based on range.

This is why we sorted each batch of data as it came in. Most of the data we stored was logs of something, so each batch covered a fix time range anyway, and so would produce a bunch of trees restricted to a range of some datetime column; so we could query efficiently on date ranges. But if we then sorted by the next most common WHERE-clause constraint column, we would constrain the trees in the batch to each cover a narrow range of that column, and therefore query more efficiently for ranges of values for that column.

But what about restrictions on columns other than event date and the sort column? Other values in the table, unless correlated with one of those columns, tended to end up rather randomly scattered, so the min and max for that column in every partition tended to approximate the min and max for the entire table, and we couldn't eliminate trees in the lazy union using constraints on that column.

Normal databases let you create indexes on columns, but a B-tree index tended to take up space in the order of many bytes per record in the base table. Unless the table had a lot of columns, this means the index is something like a tenth the size of the uncompressed data. As our tree files were about a fortieth the size of the typical data we had, even a single B-tree index would dwarf the original table; and the compression was a big selling point for people. THat wouldn't do. We needed subtler tricks.

So we let users nominate columns they'd like to do equality queries on (field=literal). For each partition, we'd then create a bloom filter of all the values of that column occurring in the partition, and store it in the partition cache, associated with the tree containing that partition. Bloom filters are basically a set, but with a twist: you insert values into the set, then you can query the set to see if a value exists. While a normal set gives you a yes or no answer in this case, a bloom filter gives you a "maybe or no" answer; it's either definitely not in the set or it might be in the set. In exchange for that vagueness, the bloom filter is a lot smaller than a conventional set representation. How big it is is something you can choose - larger ones are less likely to say "maybe" when the answer is really "no" (false positives). In our cases, we went for a 5% false positive rate by default, which gave us bloom filters that consumed five bits per unique value in that column in the tree file; as each tree held about a million records, so would store anywhere between one and a million unique values for a given column, that meant the filters consumed anything from a byte to 600KiB. When a query came in of the form SELECT ... FROM table WHERE bloom_filtered_column=x, we'd scan through all the bloom filters in the partition cache to look for trees that might contain the value x, and then perform the query on those.

And we could get more advanced than that. If users wanted to optimise queries of the form column LIKE '%foo bar%' - looking for a substring in a field - they could create another kind of bloom filter, which only worked on text columns. In this case, we'd go through each value of that column in the tree and extract all the groups of N characters (N might be four or five, typically). So if the field contained I eat food, the groups of four characters would be "I ea", "eat", "eat", "at f", "t fo", "foo", and "food". We'd take all of those for the entire column in the tree, and make a Bloom filter of them. Then if the user asked for a substring of N characters or more - N had to be chosen to be smaller than or equal to shortest expected search substring - we would do the same for the search string and then look in the bloom filters for those. So if the user had column LIKE '%eat food%' in their WHERE clause, we'd split that into "eat", "at f", "t fo", "foo", and "food". Only trees whose bloom filter return a "maybe" for all of those substrings can possibly contain the target string, so we can eliminate any that don't. We had a few other variants on the bloom filter, too, for even more specialist cases.

So, let's recap. I've explained how we physically stored table data, how we imported batches of data, and started to explain how we do simple queries on a single table.

As I was saying, when such a query came in, we'd get this Lazy Union (LU) object that represented all the trees in the table, and we'd tell it to restrict itself to trees that might be pertinent to the query based on the WHERE clause. If this step didn't reduce the list of trees at all, that was fine; what mattered was that we eliminated as many trees as possible to improve performance by considering less data, while never eliminating a tree that might contain records of interest.

Once we had the restricted list of trees in the lazy union, we would be able to generate the parallel query plan. In this simple case, we'd generate a template job that could be applied to each tree file (in this case, this job would open the tree file it was being applied to, request only the columns it needed for the SELECT clause, and then filter them on the WHERE clause to find only the matching records). That template job would be applied to each tree found in the Lazy Union in parallel across the cluster, and the results streamed back to the user.

Join queries were a bit fiddlier. If the user wanted a simple two-table join, such as SELECT ... FROM a, b WHERE a.x = 123 AND b.y = 456 AND a.z = b.z, we would load up two LUs, one for a and one for b, and restrict both by the WHERE clauses that only affected on table: a.x = 123 on a and b.y = 456 on b. Those would limit the numbers of partitions involved, but still, any partition from a could contain rows that match rows from any partition of b - so we would need to consider all possible combinations. This meant running a number of jobs proportional to the product of the size of the tables. So we tried to reduce that in a few ways.

Firstly, once we'd built a restricted lazy union for one table (say, a), we could quickly work out an approximation to the MAX and MIN of the join column, z, in table a. Because we'd already restricted the LU on the a.x = 123 clause and just had partitions matching that, we could just look at the MAX and MIN of z in those partitions (data which we held in the metadata in the partition cache), and then when we built the restricted LU for b, we could restrict it on b.y = 456 AND b.z BETWEEN min AND max, thereby further restricting b. Of course, once we'd done that, we could approximate the min and max of z in b using the same technique, and further restrict the LU for a, and thus get a smaller z-range for a, and further restrict the LU for b - in fact, we could continue this until the range of z stopped shrinking if we wanted to, but in practice, it usually wasn't worth going for more than a couple of iterations.

Also, rather than building a job for every possible combination, we could instead look at each individual partition of table a, find the actual set of values of z in that partition, then restrict a copy of the LU (already restricted as already described) even further to find only matching trees from b to combine with that partition from a.

Some queries don't need to return all the matching records; some queries were just for aggregrates such as COUNT or SUM or MIN or MAX; some queries were GROUP BY queries that would aggregrate in groups; some queries still returned every record but needed to sort them first because of an ORDER BY. In these cases, we still generated per-tree jobs in parallel to find relevant records, but then we'd run a single special job on a single cluster node that would read the results of the parallel phase and do final processing.

There's a final simple case - a query so simple it just needs a single job. SELECT 1, for instance, which needs no table data from tree files. Or queries that can be fulfilled purely from the tree metadata in the partition cache, so again didn't need to read any tree files.

But what about more complex queries? Joins of three or more tables? Queries with subqueries? Common table expressions with a WITH..AS? Well, we handled all of them by splitting them up into a directed acyclic graph of simple "sub-queries", which operated on a combination of actual "base" tables and the results of other queries; a single final sub-query would produce the final results of the query. In fact, internally, we converted the entire query into a big WITH..AS with each query being one of the above cases we already know how to handle.

We would run the subqueries in parallel, as soon as all the tables they needed existed. To begin with, only the subqueries that depended purely on base tables would be able to run, but the results of subqueries would be built into temporary tables, in a special registry namespace local to that query. As subqueries finished, other subqueries would become runnable because their input tables would exist. We'd run subqueries in parallel across the cluster, which was pretty cool. Eventually, the final query would become runnable; we'd run it and send the results to the user and be done.

So a query that joined tables a,b,c and d could join a and b in parallel with joining c and d, then join the results of those two.

But with all this parallelism going on - different subqueries running different parallel jobs at the same time, all competing for cluster resources with each other and other queries in the system - analysing what happened during a query could have been difficult. Just writing logs on each cluster node wasn't really enough. So each query was given a cluster-wide unique ID when it started, which was passed down to every subprocess involved in executing it. Each process within a subquery had a role name unique within the subquery, assigned as part of the query plan; so a process could be identified by the query ID, the subquery name, and the process role name. So we logged events into a per-process log file on the node the process ran on, identified by the process' unique identifier. Once the query was done, we'd either make every node delete all log files for the query ID if nothing went wrong; but if the query failed, or if a trace was explicitly requested, we'd gather all the log files for the query together onto one node and consolidate them into a "trace file".

This file was an sqlite database, containing all the log events in a highly structured form that made it easy to pull out only events pertaining to particular parts of the query. We also stored high-resolution timestamps so we could extract precise timings for performance analysis. The log events fell into three groups: Traditional log strings as you'd find in any UNIXy log file, machine-readable data items with a key and a value, and phase start/stop events. The latter kind broke the query down into phases, ranging from high-level stuff such as entire subqueries and the planning versus execution phases of subqueries down to fine-grained parts of individual jobs: opening tree files, performing various algorithms, and so on. These served two purposes: they gave other log events a context as to what the system was doing when they happened, and they let us see where the time was spent in a query for performance reasons. The machine-readable data items were generated to record various interesting properties (one example was that every process logged its rusage counters at the end of execution - CPU seconds consumed, peak memory size, number of IO reads and writes, that sort of thing). We recorded a lot of interesting things, such as the peak parallelism of subquery execution.

Because of all the parallelism, phases overlapped with each other; so I took great pleasure in writing a tool that would take a query trace file and produce a Gantt-style chart showing how the time within it broke down. This thing was very popular; support staff could ask users to send them the trace from a failed query, or request a trace for a poorly-performing query, and then we could dissect the trace file to work out what happened. Also, they were popular with the test team; a query that was carefully written to test a particular algorithm in the system could check that the algorithm was actually used by looking in the query trace. As a future change to the query optimiser might alter how queries executed, it would otherwise have been quite common for a test that checked we got correct results in a situation where a certain algorithm was used to no longer actually trigger that algorithm, meaning a part of the system would be left untested. Logging the peak parallelism of subquery execution as a machine-readable data item meant that the various limitations on maximum parallelism could be tested; and tests that were meant to check that everything worked correctly at high levels of parallelism could check that the desired parallelism was actually achieved and some other bottleneck in the system didn't prevent it.

It was a very different world, working on that system compared to the low-latency database I covered in the previous blog post. Which, me being me, just makes me think about ways in which the two approaches could be merged into a database capable of supporting both kinds of workload...

Cool things I have worked on: Low-latency highly-available NoSQL data store (by )

I've worked on a bunch of cool things in the past; and since I'm always explaining them to people, I realised it'd be good if I just blogged about them so I can just share the link.

I was asked to help design and then build a database to back a social networking system. The main requirement was to be able to satisfy lots of single-record primary-key fetches from a single thread as quickly as possible, because a PHP web app would be requesting lots of data to build the page; we had to do a few hundred of these "random point queries" in a small fraction of a second to get the page rendered in time. It had to be able to scale sideways, by adding more machines to the cluster. It also needed to be able to update this data, but at a much lower load - updates and creations of single records happened when a user went and did them, but people browsing around the site would hit several pages, each of which would request hundreds of records.

And it needed to be highly available. The app was to be spread over two datacentres for redundancy purposes, and as long as at least one server was reachable, the whole thing had to keep working as much as possible. We couldn't use "quorum" systems where a majority of the nodes needed to be reachable to make an update, for instance.

This, combined with the projected dataset size being a few terabytes at most, meant we decided to go for full replication. Every node had a copy of the database, and rather than being a daemon you spoke to over a socket like a conventional database, we'd write a shared library that would be loaded into the PHP process, so as to avoid context switch overheads for each query. The shared library would read the data files directly from disk.

The on-disk format was B-trees, but with a multi version concurrency control setup: update transactions (more on those later) would write all the new data into unused space until it was ready, so reads could still be happening while an update was being prepared, so as not to delay those all-important reads.

Updates were done with a reliable multicast protocol. The shared library would broadcast up dates to all nodes currently reachable from the process sending the update. Those updates were received by a daemon process running on every node, which took updates from the network and applied them to the local disk storage. But there were many devils in the details here.

Because every daemon had to process the entire update load of the system in parallel, they had to try and maximise the use of disk bandwidth. We couldn't afford to be rushing around updating every record with its own transaction, with all the sync overheads; we had to batch them somehow to amortize the overheads of committing them to disk reliably. Also, as updates were handled asynchronously and sent from a large pool of PHP processes, we needed to stop the PHP application from sending updates faster than we could process them and overrunning the network buffers. And just to make matters worse, updates could come from the network in unusual orders for various reasons, but we needed to apply them in a consistent order on every node so multiple updates to the same record all produced the same result in the end.

So, every daemon had an update queue. Its highest priority was pulling updates off the network and into the queue; its second priority was processing updates from the queue. But the queue wasn't just a FIFO. We gave every update a sequence number when it was generated, from a sequence number counter in shared memory on every node; and to ensure global consistency, we implemented Lamport timestamps - when an update was received from another node, we immediately updated the local sequence number counter to make sure it was more than the received sequence number, so that all updates issued from a node with knowledge of a given update would carry later timestamps, thereby ensuring that any transaction which read a current value and then updated that to a new value would be applied "later" than the update that provided the old current value.

So our queue ordered incoming updates by sequence number (using the ID of the issuing node as a tie breaker, to ensure a global ordering). We also stored the update sequence number on every record on disk, and refused to update a record to an "older" value, to deal with re-orderings over a longer time period than those which met in the queue. The queue also noticed updates to the same record group and merged them into a single update, always replacing older data with newer according to the sequence numbers. This meant that a flurry of updates to the same record would, as long as they all arrived before the queue was flushed, all "coalesce" into a single update to the on-disk data.

Wait, I said "record group"; what's that? Well, in the data model, often a bunch of records would represent parts of one object. Imagine an invoice in the traditional SQL format, with an invoice record and a bunch of line item records that are components of it. We encouraged the use of small records, as we updated records by providing a new value for the whole thing, and lots of small records meant that finer-grained bits of a record can be updated in parallel without trashing each other's updates. So as part of the "schema" we defined a mapping from record primary keys to record groups; many records would go in the same group, and was stored as a single entry in the on-disk B-tree, so we wanted to coalesce updates to the same record group into one, thereby getting more records updated in a single read-modify-write cycle.

Under the hood, the queue was two data structures - a priority heap full of record-group update lists, ordered by sequence number; and a hash table from record-group IDs to the same update lists. Incoming updates were mapped to record-group IDs and looked up in the hash; if an update to that group already existed then the update would be merged into it, and if the sequence number was older than the one already in that queue entry, we'd move the entry up the queue by re-inserting it into the priority heap. Otherwise, we'd create a new entry with a single update to the record group, and pop it in the hash and the heap. Keeping the two in synch in all cases was a bit of work, but once we'd covered all the cases, it looked after itself.

The queue also helped us prevent overload, too. Every node measured how big the backlog was, in three different ways: There was a maximum number of record group updates we wanted to hold in the queue, a maximum number of bytes of new record data in the queue, and a maximum average latency (in seconds, between the PHP code requesting the update and it getting written to disk on the node). We tracked the latter by putting wall-clock timestamps in the update messages and keeping an exponentially-weighted moving average of the difference (like a UNIX load average), and the former two came directly from the queue data structure. On each node, we compared each of the three measurables to our target and scaled it to a 0-100% capacity measure; we then took the worst of the three as the backlog score for that node, a measure of how "far behind" it was.

These were broadcast to the other nodes in the network, to compute an overall "worst backlog of any node" for the entire cluster. We were a bit clever with this - if a node's backlog wasn't the worst, then there was no point in it telling any other nodes about it, so only the nodes that were at the worse backlog level in the system were broadcasting their state. Again, there were lots of edge cases in this, which we carefully handled to make sure the system as a whole knew the worst backlog state of any node in the system, with the minimum of network noise.

So each node now knew how backlogged the cluster as a whole was; and we used that information to apply back pressure to the PHP clients. The backlog score was stored in the shared memory segment, and the function that was called from PHP to send an update consulted it. If the backlog was more than a minimum, then gentle pressure was applied to slow the PHP application down - before sending the message, we would sleep for a duration that depended upon the backlog level. If it was at or more than 100%, we'd actually sleep in a loop until it abated; but otherwise, we'd sleep for a time that increased smoothly from zero to infinity as the backlog went from 0% to 100% (accounting for the situation where the backlog dropped while we were sleeping, to a point where we'd already slept enough). This meant that if the PHP app was trying to generate writes faster than we could get them to disk, there'd be a certain amount of "stretchiness" in the system as the implicit queue in the network and our update queue took up the slack. But if the update queue kept growing, the PHP app would face increasing delays in getting updates applied. This created a negative feedback loop (as there was a finite limit to how many PHP processes could exist across the cluster), and we tuned the shape of the feedback curve such that the system would settle to a stable backlog level (and, thus, a stable sleep duration, and thus a stable write rate) rather than oscillating.

This meant that when the PHP code did an update, it would be some time before that update was visible on every node. For some parts of the system, that was fine, but for other parts, an immediate response to updates was required, so we gave the caller of the read function a choice of "consistency level". At the most basic and fastest level, it would just check the local state, and potentially miss out on updates that were still flying through the network. At the "immediate" level, however, it would first hash the record key and use that to select a "primary node" for that record. That node would be responsible for storing, in-memory, the latest state of that record if it was currently being updated; because the update function also offered a consistency level setting. The most basic one just send the update out onto the network, and the "immediate" level first sent the update to the primary node's in-memory update buffer. The use of specific nodes for this threatened our reliability guarantee; if that node was unavailable, or crashed and restarted while an update was in progress, we'd lose the update from the buffer - but it would still proceed through the normal mechanism. So if updates and reads to a record both requested "immediate" consistency, we would not see update latency (except when nodes failed), but at the cost of network round trips harming the latency.

Updates had a third consistency level, too, which was meant for creating new records with an external primary key (such as a user record, keyed by email address): if this was selected, then a two-phase commit process was used. The record creation would be proposed on every reachable node, which would cause that primary key value to be marked as reserved in the database, or the proposal would be rejected if it already existed or was already reserved. If a majority of the nodes in the system were reachable and OK with the update, then it would be actually performed; otherwise, it would be rejected. These were slow, and we tried to avoid them, but some parts of the application required that level of atomic global consensus.

You might be worrying that these asynchronous updates could get lost between a PHP process sending them to the network and at least one node receiving them, but all updates were received by the local node intrinsically; and once an update is on one node, it'll get to all the others. The reliable multicast protocol lets each node know if it's missed some updates from others (by using a per-node monotonic sequence number on every message, and looking for gaps). If that happens, it would try and retransmit if they were still in the send buffers; if not, then a network break would be declared, and the nodes would know they'd missed some updates from some other nodes. This would start a process by which all nodes that had missed some updates would declare the range of global sequence numbers they seemed to be missing, and the nodes that had updates in that range would then re-transmit them all (by using a special index from update sequence number to record across all tables). They would be retransmitted with the original sequence numbers, not new ones, so no matter what order which nodes got them in, they would eventually get to all nodes and end up in the same global ordering, so every node would end up with the same final state.

A node that starts up from a reboot or from fresh used the same process - the knew the last sequence numbers they'd been fully in synch with the cluster at, so they'd be able to request the latest state of all records updated since then. A totally new node would have seen no updates, so would request all records changed since timestamp zero, which would be all records. In these cases, the node would request that the playback be unicast to it alone, as opposed to the network partition case where nodes would cooperate to request broadcasts of all the missed updates to get the whole network back together.

And so we had a system that could (on my 2008-era laptop in a VM) do several thousand random record reads per second from a single thread, while handling bursty asynchronous updates of a thousand or so records per second (many many more if they all happened to fall into a smaller number of record groups!), while nodes came and went from the cluster, and the cluster was occasionally split into arbitrary sub-clusters due to network link failures. When the cluster was working properly, it would provide a completely consistent view of the database; but when the cluster was in a failure state, all nodes could service reads and updates (except the super-consistent unique primary key creation updates, which would all fail if a quorum of nodes couldn't be contacted to agree), but updates could then be delayed. I've not seen a system since that can provide that level of service.

Programming with state machines (by )

State machines are a really nice way of thinking about systems with state. As a notation for expressing imperative code, I much prefer this:

To this:

x = readchar();
while(x != EOF) {
   if(x == 'a') {
      doA1();
      x = readchar();
      while(x != EOF && x != 'x') {
        doA2(x);
        x = readchar();
      }
      doA3();
   } else if (x == 'b') {
      doB1();
      x = readchar();
      while(x != EOF && x != 'x') {
        doB2(x);
        x = readchar();
      }
      doB3();
   } else {
      doC();
      x = readchar();
   }
}

I've always wished I could write that sort of code as a state machine rather than a bunch of nested structured programming primitives. Although there is some charm to the idea of drawing a diagram on the computer and having that be compiled as executable code, though, that can also be fiddly and isn't well supported by conventional programming tools; so I'd rather actually describe my state machine in text. What I want is a syntax for writing state machines, rather than writing imperative code that implements the state machine.

For instance, the diagram I put above wasn't drawn by me; it was drawn by a tool called PlantUML from the following source code:

@startuml

state Processing {
      Ready --> A : a/doA1
      Ready --> B : b/doB1
      Ready --> Ready : not a or b/doC

      A --> A : not x/doA2
      A --> Ready : x/doA3

      B --> B : not x/doB2
      B --> Ready : x/doB3
}

[*] --> Ready

Processing --> [*] : EOF

@enduml

But the UML state diagram is meant for abstract modelling of systems, and lacks the precision and detail required for actual executable code. The state diagram doesn't make it clear that doA2 and doB2 need to be passed the character that was read, for instance. PlantUML's syntax, therefore, also lacks that kind of detail, so we can't just use that.

Also, in a programming situation, it would be nice to be able to express types of state machines: to be able to say that a group of state machines has some useful properties in common, defined by the "type". The type of a state machine defines some high-level states and what inputs or outputs are possible in each state; any state machine of that type has to follow that behaviour, even though it might break the states defined in the type into many substates. For instance, the example state machine above always accepts any character until it receives an EOF, then stops; that might be expressed as a type having a single Ready state with two transitions, one mapping an EOF to the end state and another mapping a character back to the Ready state, with no outputs. That type would be suitable for any state machine that accepts characters until they finish, so a generic "character input stream" might accept a state machine of that type to process them, without needing to know anything about As and Bs.

These thoughts have led me to the following model of a state machine that's suitable for use as part of a programming language (although I've not defined an actual syntax yet):

State machine type

A state machine type is a set of states. Each state has a name, and a list of message types which are acceptable in that state (I am assuming that the underlying type system is smart enough to include things like "character equal to a" as a type, rather than merely "character"). This list must be disjoint.

For each input message type, there's a list of possible next states and an output message type (which in our example above would be a void type, as we never output messages).

There is also an initial state. It's impossible for the initial state to be the target of a state transition on a message, so it only has transitions out to other states. Those transitions all represent possible constructors of this state machine, because the initial state represents nonexistance.

For instance, a type" for vending machine controller state machines might be (given a Result type which has two values, success and fail):

Initial state:
   input: (Create) output: (Void) next state: (Idle)

Idle state:
   input: (Coin(Value)) output: (Value) next state: (Credit)

Credit state:
   input: (Coin(Value)) output: (Value) next state: (Credit)
   input: (Refund) output: (Value) next state: (Idle)
   input: (Select(Product)) output: (success::Result) next state: (Idle)
   input: (Select(Product)) output: (fail::Result) next state: (Credit)

The Value objects output in response to Coin or Refund inputs happen to reflect the current credit in the machine, a fact which the state machine type alone can't represent (that would need to be explained in a design-by-contract invariant of some kind).

From this type can be derived the type of the state machine's transition functions, which in this case will be:

coin :: Idle -> Value -> (Credit,Value) |
        Credit -> Value -> (Credit,Value)
refund :: Credit -> (Idle,Value)
select :: Credit -> Product -> (success::Result,Idle)|(fail::Result,Credit)

Note that the Create transition isn't represented here, because this state machine can't actually be instantiated - it doesn't specify the behaviour enough. There's no way of saying which possible select result should happen, or of what values are returned as outputs. But the functions we have are automatically implemented by the state transition definition, and will cause the appropriate transitions to occur in any state machine that's a subtype of this one; every function accepts an instance of the state machine in a given state, and an input value, and returns a new instance of the state machine and the output value.

Subtyping

State machine types can be subtypes of other state machine types. A state in the parent type may be split into several states in the subtype; any transition to a state A in the parent type must be represented by a transition in the subtype to some substate of A, and all transitions possible from state A in the parent type must be represented by transitions from every substate of A in the child.

For instance, a subtype of the vending state machine might have an infinite number of substates of "Idle", one for each possible configuration of available things to vend inside the machine. If it vends three different products, then these substates might be called Idle(A,B,C) where A,B and C are nonnegative integers representing the stock of each item. The only transition out of "Idle" is when a coin is inserted, so every Idle(A,B,C) state must have a transition from a coin being inserted, returning a Value, and taking us into the Credit state.

However, our Credit state also tracks the stock, and also tracks the credit inserted so far - so we need an infinite set of states of the form Credit(A,B,C,X) where A,B,C are the stocks and X is another nonnegative integer for the number of pennies of credit. So the parent-type transition from Idle to Credit has to be replaced by a transition from every Idle(A,B,C) to Credit(A,B,C,X) where X is the Value of the coin inserted. All the transitions from Credit in the parent type need to be represented as transitions from every Credit(A,B,C,X) state to other Credit(A,B,C,X) or Idle(A,B,C) types as appropriate. But with that done, it can be shown that all the states and transitions of the parent type correspond to one or more in the subtype.

The one exception is the initial state - the creation transitions from that in the subtype need not correspond to those of the parent type.

Given an extension to the syntax to create "parameterised states" such as Idle(A,B,C), which are shorthand for a potentially infinite set of "concrete states" such as "Idle(4,0,1)", we might represent that subtype like so:

Initial state:
   input: (Create(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger))
     output: (Void) next state: (Idle(A,B,C))

Idle(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger)
   state implements Idle:
   input: (Coin(x::Value)) output: (x::Value) next state: (Credit(a,b,c,x))

Credit(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger,x::NonNegativeInteger)
   state implements Credit:
   input: (Coin(x'::Value)) output: ((x+x')::Value) next state: (Credit(a,b,c,x+x'))
   input: (Refund) output: (x::Value) next state: (Idle(a,b,c))

   input: (Select(Product==A)) when: a>0 output: (Success) next state: (Idle(a-1,b,c))
   input: (Select(Product==A)) when: a==0 output: (Fail) next state: (Credit(a,b,c,x))

   input: (Select(Product==B)) when: b>0 output: (Success) next state: (Idle(a,b-1,c))
   input: (Select(Product==B)) when: b==0 output: (Fail) next state: (Credit(a,b,c,x))

   input: (Select(Product==C)) when: c>0 output: (Success) next state: (Idle(a,b,c-1))
   input: (Select(Product==C)) when: c==0 output: (Fail) next state: (Credit(a,b,c,x))

(We really should have represented the stock as a Map(Product,NonNegativeInteger) type and then not duplicated the select rules...)

Converting a parent state into a parameterised subtype state is only one way of splitting a state in a subtype, too. The syntax above would permit the creation of separate states that all say "implements Credit", too. A very simple vending machine that can only hold one instance of one product might split Idle into two states, Empty and Full, for instance; and probably split Credit into CreditFull(Value) and CreditEmpty(Value) to represent the Credit state while also parameterising CreditFull and CreditEmpty with the current credit.

The above syntax for parameterised states can get unweildy if there's a lot of parameters, especially if generally only a few of them are changed in any given transition (imagine the above example of there were ten different products to keep track of). Therefore, as syntactic sugar, it makes sense for it to be possible to define parameters shared implicitly by one or more states. Transitions into those states from states outside the group need to explicitly specify all the parameter values, but transitions within that group can use a different syntax to represent the next state, which only specifies what parameters have changed. All the rest are passed in unchanged, automatically. It might look like this:

Initial state:
   input: (Create(a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger))
    output: (Void)
    next state: (Idle(A,B,C))

parameters a::NonNegativeInteger,b::NonNegativeInteger,c::NonNegativeInteger {

Idle(a,b,c) state implements Idle:
   input: (Coin(x'::Value)) output: (x'::Value) next state: (Credit(x <- x'))

Credit(a,b,c,x::NonNegativeInteger) state implements Credit:
   input: (Coin(x'::Value)) output: ((x+x')::Value) next state: (Credit(x <- x+x'))
   input: (Refund) output: (x::Value) next state: (Idle())

   input: (Select(Product==A)) when: a>0 output: (Success) next state: (Idle(a <- a-1))
   input: (Select(Product==A)) when: a==0 output: (Fail) next state: (Credit())

   input: (Select(Product==B)) when: b>0 output: (Success) next state: (Idle(b <- b-1))
   input: (Select(Product==B)) when: b==0 output: (Fail) next state: (Credit())

   input: (Select(Product==C)) when: c>0 output: (Success) next state: (Idle(c <- c-1))
   input: (Select(Product==C)) when: c==0 output: (Fail) next state: (Credit())
}

Implementing state machine types

Note that we've suddenly started giving names to the Values flying around, and specifying an output value rather than just a type, and splitting transitions into different cases with disjoint boolean "when:" expressions. This, too, further constrains the type of the state machine; indeed, this machine can actually be implemented and will provide a simple model of a vending machine - although nothing will be vended.

This means that this new state machine's state transition functions will include a "create" function from the initial stock levels to an instance of Idle(NonNegativeInteger,NonNegativeInteger,NonNegativeInteger).

A real implementation could further subtype this, adding a uniquely-typed IO interface to the Idle and Credit states, passed into the create transition. The Success-returning select operations can then also use pure IO functions to mutate the IO context to one in which the physical mechanism of vending has happened.

But the implementation of a state machine exists in the expressions that return the output value, parameterise the next state, and the boolean expressions forming the "when:" guards. These are full expressions in the pure functional language I'm imagining this thing implemented on top of, so are fully Turing complete.

There is no syntactic distinction between a type and an implementation; a state machine type can be instantiated if all of its outputs have expressions giving them values (ok, singleton-typed outputs such as Void can be implied!). This is a bit like concrete versus abstract classes in an object-oriented programming language.

Chaining transitions

So far, we've only looked at transitions that accept an input value, return an output value and move to a new state. This means that state machine behaviour is defined entirely in terms of its reactions to external values. Since the values we generate for the output and the parameters of the new state are Turing-complete expressions, that is sufficient to describe any behaviour, but it's not always convenient to express complicated behaviour in an expression. Sometimes a complicated operation of a compound data structure such as a list would be better defined as a state machine in its own right.

In order to support that kind of thing, we can also support chaining transitions, which can drive state machine behaviour purely based on internal state, and so do not cause the generation of state transition functions. These come in three types.

The first kind is one that starts with an input value, but which then leads to a next state without producing a value. This next state must be a special "transient state", the rules for which are explained below.

The second kind is one from a transient state to another transient state. This has no input value and no output value.

The third kind is a transition from a transient state to a non-transient state, which produces an output value.

Transient states may not have any non-chaining transitions point to and from them. Every transient state must be referenced from at least one non-transient state through a chain of chaining transitions, starting with a kind-1 chaining transition then zero or more kind-2 chaining transitions. Every transient state must also lead to at least one non-transient state through a chain of zero or more kind-2 chaining transitions, then a kind-3 chaining transition.

Every such possible chain from a kind-1, through zero or more kind-2, to a final kind-3, is equivalent to a single transition from the initial to the final non-transient states; with the kind-1 transition defining the input value and the kind-3 transition defining the output value. They might be implemented as a set of state-transition functions that just tail-call each other, but the transient states are therefore "hidden" inside the state machine from external perspectives. Those state-transition functions are lexically local to the exposed state-transition functions implementing the equivalent non-chaining transition.

But although they are hidden from users of the state machine, a subtype of the state machine can indeed "see" them for purposes of splitting them into substates, adding transitions between them, etc.

Termination

The final thing I've not covered above is terminating state machines. There's an implicit "final" state available in every state machine, if at least one transition to it exists; no transitions from that state can exist. Any transition to the final state appears in the state transition functions as a function that returns the output value, but not paired with a new state machine instance, because there isn't one. It's gone.

It is entirely possible for a state machine to have no non-transient states apart from the initial and final states. For instance, a state machine that traverses a tree and sums up the labels on the nodes might be implemented in terms of transient parameterised states for the current position in the tree and the running total, with an kind-1 creation transition that accepts the tree as an input and a kind-3 finalisation transition that returns the count. This machine will therefore have a single state transition function - from Tree to Number. The actual state of the machine is entirely represented as state parameters passed between mutually recursive functions implementing the chaining transitions, and never appears as a value visible outside of that.

Conclusion

I think I've defined a workable semantic model for state machines that can be used as a way to define sets of pure functions between instances of state machines, that can handle both synchronous state machines driven by external events, and those that bumble around a data structure or iterate mathematical operations.

It needs a proper syntax; personally, I'd prefer one based on s-expressions or IRON, but suitably consistent forms could be defined for any language.

Our visit to Maker Station (by )

As a member of two hack spaces (and co-founder, secretary, and treasurer of one), I couldn't pass up the opportunity to visit a local hack space during our visit to South Africa. A quick web search later and I found that Cape Town's hackspace is called Maker Station. I dropped them a message asking if I could drop by and say hi as an ambassador from the UK, and asking them to suggest a time - UK hack spaces tend to have an open evening sometime in the week when random people can turn up to look around and meet people; so I was expecting something like that, but didn't see a time advertised on the Web site. But they suggested I suggest a time, so I did.

If we'd planned this a bit better, we'd have brought stickers (a traditional hack space give-away gift) from Bristol and Cheltenham Hack Spaces, but we didn't - so, instead, Sarah painted a picture from each hack space:

Bristol Hackspace picture Cheltenham Hackspace picture

For those not familiar with them, UK hack spaces tend to be run like a club or society - a constitution document of some kind sets out rules for people to become members, and for members to vote on a board who are in charge of making sure the space meets its legal obligations and controls the flow of money. Usually, members get unlimited access to the space and voting rights in exchange for a monthly membership fee (with some tools that use expensive consumables requiring extra usage fees on top to cover that). The monthly memberships go into a bank account, and the elected board choose to spend that on rent, insurance, electricity, broadband, consumables, and so on, and the money left over each month piles up until it's enough to buy a fancy new tool requested by the members. Most members have day jobs and hack on projects in their spare time, so hack spaces tend to be quiet during the day and busy in evenings and at weekends; and, as I mention above, there's usually an open evening every week for potential new members to come along and visit, which is also the day members come along to socialise, thereby ensuring there's a good population present to welcome new people. There are no paid staff; the board are all volunteers, and members are expected to unlock and lock up if they visit at a time when nobody else is around.

So I was quite interested to find out that Maker Station was different. We were met inside the entrance by Felix, one of the founders. The entrance led directly to a cafe area, with leaflets of hackerly interest lying around, and a range of drinks and crisps and stuff (including Maker Station logo biscuits they'd made in their own rocket oven!) for sale. The space is staffed and open during business hours; the two founders are there during the day as it's actually their day job, and they have two employees to help (the cost of living in South Africa is much lower than in the UK, which is what makes this practical).

Maker Station cafe area

Chatting with Felix in the cafe

Beyond the cafe was the hack space itself. Much of the space is divided into benches (or larger studios), which are either rented by the day in a sort of hot-desking arrangement:

Somebody making things on a Maker Station

...or dedicated to a single user who pays regularly for it, so has extensive tool and work-in-progress storage dedicated to them:

One of the Maker Station studios

People normally used it during the day, but if people were still hacking when 6pm came, they'd keep it open in the evening as well. One user I spoke to there was making a commission for a client, suggesting that the member demographic was more people hacking on stuff for a living than evening hobbyists. Felix and his brother (the other founder) don't quite make enough to run the place from memberships alone; the shortfall is made up by them working on paid commissions of their own in the space. Felix showed us some current projects they were working on, an exhibit for a local science centre and a small wind tunnel for somebody experimenting with wind turbine designs:

Felix shows off a current project

I didn't get the impression there was quite the sense of community that UK hackspaces have, with their busy open evenings and highly decentralised governance; Felix said that he often found himself acting as a "broker" between people who wanted some skill and people who had it or who a good supplier was for something, while in the UK, such connections usually arise organically on the open evening, so I suggested he might like to set up a weekly social slot in the cafe (and maybe a wiki for sharing information like supplier lists, like we have at Cheltenham Hackspace).

I was very impressed by their facilities. A proper cafe! Lots of space! Many, many, tools, including a decent metal lathe, forge, foundry, and welding gear!

Welding stuff Assorted metalworking stuff Big metalworking lathe

Interestingly, they didn't tend to go in for the stock UK hack space tools of laser cutters and 3D printers. It turns out that in Cape Town there are several suppliers who will do small-job CNC cutting and lasering and 3D printing at a reasonable price using high-end equipment, within easy travel of Maker Station. As far as I can tell, it's prohibitively expensive to get that sort of thing done in the UK other than in industrial quantities, which is why UK hack spaces end up buying their own equipment!

Felix seems to be really good at community outreach and education - something we're looking to expand at Cheltenham Hackspace, not to mention a speciality of Sarah's, so we were interested to hear about that. Here's a video of Felix giving a talk to students about prototyping. One thing that impressed me was that he runs things he calls "disassembly workshops"; take a pile of unwanted appliances, and unleash a bunch of children on them with screwdrivers (and some expert help) to tear them apart. This is fun in itself, and provides an opportunity to learn how the things you're taking apart work, as well as building skills in using the tools and working out how to get things to bits.

Once you have a pile of bits, depending on the age range and abilities, you can let the kids stick the bits together to make art to take home - or teach them electronics by wiring them up to do new things, maybe even so far as building robots out of the mechanical and electronic parts.

Here's some photos from a recent disassembly workshop they did: 1 2 3 4 5.

We enjoyed our visit to Maker Station. It was refreshing to see a different take on the usual hack space financial model, and interesting to see how the differing economics of South Africa affected what a hack space needed to be and could do. And Felix was inspiring as an educator and speaker! I'm keeping a close eye on his Twitter feed for good ideas to use in my own sci/tech outreach activities 🙂

Public perceptions (by )

Let us consider two arguments.

  1. Immigration drives the UK economy, creating more and better-paid jobs, and cheaper products and services. This is supported by actual data obtained from tax records and other reputable sources; immigrants have tended to come here in pursuit of work, to fill demand that is not being met locally, so have provided a valuable workforce for industry to grow upon, and consumed less benefits than native people. Growing industry creates more jobs and provides cheaper products and services, and a growing population creates more demand for products and services, which also stimulates industry.
  2. People are poor, due to unemployment or having to work for low-paid jobs as it's all they can get, and benefit cuts because there's not enough money to go round. So why are we letting foreigners in to compete for our jobs and benefits?

Clearly, the former is a more correct argument, as it's backed by facts; the latter does note cite its sources at all (Are people really poor? Compared to what? If so, is that really why people are poor? Are the "foreigners" actually competing for a fixed pool of jobs and benefits? Are there enough of them that it would actually make any difference if they weren't "competing", or are there much bigger issues we should worry about?)

Yet recent events suggest that the latter argument has swayed the public opinion better than the former.

Why is that? Why is my opinion of the two arguments so different to what the majority of my fellow Brits make of them?

I come from a culture (nerdy, educated) that values arguments backed up by data and mathematics. This seems self-evident to us: "But it's measurement of the actual world, processed through well-tested statistical techniques! What higher standard of generalisation about the world can we have than the Scientific Method?"

But what if you're not from our nerdy, educated culture, who've sat in classrooms and been shown the wonderful things the scientific method has given us?

Imagine yourself being presented with two arguments. One is given by a person who's not like you (they're all nerdy and they talk posh). It appeals to mathematics and science, which you found boring and irrelevant at school; you much preferred English, French and Art. That doesn't mean your stupid; you have applied your skills as a signwriter, and your work is highly respected, leading you to now run a thriving small business. But the nerdy woman saying that her maths show immigration is great reminds you of the kids who liked science and maths at school, and you didn't get on with them well.

The nerd is upset you don't seem to respect her viewpoint, and starts to explain that the evidence is all valid, but it's just a sea of numbers. She says the maths show a correlation with a great confidence level, and when you ask what that means, it quickly becomes apparent that you'll need to sit through maths lessons to actually have it all explained, and you had quite enough of that at school.

Meanwhile, somebody else gives you a short quip. "People are poor" (yeah, you can see that) "due to unemployment or having to work for low-paid jobs as it's all they can get, and benefit cuts because there's not enough money to go round" (yep, that the sort of thing you've heard from friends and family, and seen in the papers) "So why are we letting foreigners in to compete for our jobs and benefits?" Hell yeah!

The nerd might start complaining about the problems with that statement, but it makes perfect sense. The sort of person who comes up with sensible observations like that is clever, but not brainy. They're somebody who's cut through the bullshit and spotted the simple truth at the heart of a problem; not somebody who's built up a complicated argument with maths and abstract theories. You can respect that sort of cleverness, and want to hear what they think about other problems people face. They offer simple common-sense solutions.

And that, I think, is the problem us nerdy types have with public perception. Our methods produce correct answers, but the way we justify those answers to the majority of people who don't have backgrounds in the scientific method and statistical analysis is way off the mark. And while understanding this stuff suggests intelligence, not understanding it does not suggest absence of intelligence - yet the implication that it does is embedded into our culture, making us sigh and shake our heads at people who don't understand. "Oh, just trust us," we say. "We've worked it all out."

We need to keep using science and maths to find the truths of the world; and those scientific arguments are the best way of justifying our conclusions to each other. But having done so, we need to find better ways to explain and justify those truths to the world.

WordPress Themes

Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales
Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales