I've worked on a bunch of cool things in the past; and since I'm always explaining them to people, I realised it'd be good if I just blogged about them so I can just share the link.
I was asked to help design and then build a database to back a social networking system. The main requirement was to be able to satisfy lots of single-record primary-key fetches from a single thread as quickly as possible, because a PHP web app would be requesting lots of data to build the page; we had to do a few hundred of these "random point queries" in a small fraction of a second to get the page rendered in time. It had to be able to scale sideways, by adding more machines to the cluster. It also needed to be able to update this data, but at a much lower load - updates and creations of single records happened when a user went and did them, but people browsing around the site would hit several pages, each of which would request hundreds of records.
And it needed to be highly available. The app was to be spread over two datacentres for redundancy purposes, and as long as at least one server was reachable, the whole thing had to keep working as much as possible. We couldn't use "quorum" systems where a majority of the nodes needed to be reachable to make an update, for instance.
This, combined with the projected dataset size being a few terabytes at most, meant we decided to go for full replication. Every node had a copy of the database, and rather than being a daemon you spoke to over a socket like a conventional database, we'd write a shared library that would be loaded into the PHP process, so as to avoid context switch overheads for each query. The shared library would read the data files directly from disk.
The on-disk format was B-trees, but with a multi version concurrency control setup: update transactions (more on those later) would write all the new data into unused space until it was ready, so reads could still be happening while an update was being prepared, so as not to delay those all-important reads.
Updates were done with a reliable multicast protocol. The shared library would broadcast up dates to all nodes currently reachable from the process sending the update. Those updates were received by a daemon process running on every node, which took updates from the network and applied them to the local disk storage. But there were many devils in the details here.
Because every daemon had to process the entire update load of the system in parallel, they had to try and maximise the use of disk bandwidth. We couldn't afford to be rushing around updating every record with its own transaction, with all the sync overheads; we had to batch them somehow to amortize the overheads of committing them to disk reliably. Also, as updates were handled asynchronously and sent from a large pool of PHP processes, we needed to stop the PHP application from sending updates faster than we could process them and overrunning the network buffers. And just to make matters worse, updates could come from the network in unusual orders for various reasons, but we needed to apply them in a consistent order on every node so multiple updates to the same record all produced the same result in the end.
So, every daemon had an update queue. Its highest priority was pulling updates off the network and into the queue; its second priority was processing updates from the queue. But the queue wasn't just a FIFO. We gave every update a sequence number when it was generated, from a sequence number counter in shared memory on every node; and to ensure global consistency, we implemented Lamport timestamps - when an update was received from another node, we immediately updated the local sequence number counter to make sure it was more than the received sequence number, so that all updates issued from a node with knowledge of a given update would carry later timestamps, thereby ensuring that any transaction which read a current value and then updated that to a new value would be applied "later" than the update that provided the old current value.
So our queue ordered incoming updates by sequence number (using the ID of the issuing node as a tie breaker, to ensure a global ordering). We also stored the update sequence number on every record on disk, and refused to update a record to an "older" value, to deal with re-orderings over a longer time period than those which met in the queue. The queue also noticed updates to the same record group and merged them into a single update, always replacing older data with newer according to the sequence numbers. This meant that a flurry of updates to the same record would, as long as they all arrived before the queue was flushed, all "coalesce" into a single update to the on-disk data.
Wait, I said "record group"; what's that? Well, in the data model, often a bunch of records would represent parts of one object. Imagine an invoice in the traditional SQL format, with an invoice record and a bunch of line item records that are components of it. We encouraged the use of small records, as we updated records by providing a new value for the whole thing, and lots of small records meant that finer-grained bits of a record can be updated in parallel without trashing each other's updates. So as part of the "schema" we defined a mapping from record primary keys to record groups; many records would go in the same group, and was stored as a single entry in the on-disk B-tree, so we wanted to coalesce updates to the same record group into one, thereby getting more records updated in a single read-modify-write cycle.
Under the hood, the queue was two data structures - a priority heap full of record-group update lists, ordered by sequence number; and a hash table from record-group IDs to the same update lists. Incoming updates were mapped to record-group IDs and looked up in the hash; if an update to that group already existed then the update would be merged into it, and if the sequence number was older than the one already in that queue entry, we'd move the entry up the queue by re-inserting it into the priority heap. Otherwise, we'd create a new entry with a single update to the record group, and pop it in the hash and the heap. Keeping the two in synch in all cases was a bit of work, but once we'd covered all the cases, it looked after itself.
The queue also helped us prevent overload, too. Every node measured how big the backlog was, in three different ways: There was a maximum number of record group updates we wanted to hold in the queue, a maximum number of bytes of new record data in the queue, and a maximum average latency (in seconds, between the PHP code requesting the update and it getting written to disk on the node). We tracked the latter by putting wall-clock timestamps in the update messages and keeping an exponentially-weighted moving average of the difference (like a UNIX load average), and the former two came directly from the queue data structure. On each node, we compared each of the three measurables to our target and scaled it to a 0-100% capacity measure; we then took the worst of the three as the backlog score for that node, a measure of how "far behind" it was.
These were broadcast to the other nodes in the network, to compute an overall "worst backlog of any node" for the entire cluster. We were a bit clever with this - if a node's backlog wasn't the worst, then there was no point in it telling any other nodes about it, so only the nodes that were at the worse backlog level in the system were broadcasting their state. Again, there were lots of edge cases in this, which we carefully handled to make sure the system as a whole knew the worst backlog state of any node in the system, with the minimum of network noise.
So each node now knew how backlogged the cluster as a whole was; and we used that information to apply back pressure to the PHP clients. The backlog score was stored in the shared memory segment, and the function that was called from PHP to send an update consulted it. If the backlog was more than a minimum, then gentle pressure was applied to slow the PHP application down - before sending the message, we would sleep for a duration that depended upon the backlog level. If it was at or more than 100%, we'd actually sleep in a loop until it abated; but otherwise, we'd sleep for a time that increased smoothly from zero to infinity as the backlog went from 0% to 100% (accounting for the situation where the backlog dropped while we were sleeping, to a point where we'd already slept enough). This meant that if the PHP app was trying to generate writes faster than we could get them to disk, there'd be a certain amount of "stretchiness" in the system as the implicit queue in the network and our update queue took up the slack. But if the update queue kept growing, the PHP app would face increasing delays in getting updates applied. This created a negative feedback loop (as there was a finite limit to how many PHP processes could exist across the cluster), and we tuned the shape of the feedback curve such that the system would settle to a stable backlog level (and, thus, a stable sleep duration, and thus a stable write rate) rather than oscillating.
This meant that when the PHP code did an update, it would be some time before that update was visible on every node. For some parts of the system, that was fine, but for other parts, an immediate response to updates was required, so we gave the caller of the read function a choice of "consistency level". At the most basic and fastest level, it would just check the local state, and potentially miss out on updates that were still flying through the network. At the "immediate" level, however, it would first hash the record key and use that to select a "primary node" for that record. That node would be responsible for storing, in-memory, the latest state of that record if it was currently being updated; because the update function also offered a consistency level setting. The most basic one just send the update out onto the network, and the "immediate" level first sent the update to the primary node's in-memory update buffer. The use of specific nodes for this threatened our reliability guarantee; if that node was unavailable, or crashed and restarted while an update was in progress, we'd lose the update from the buffer - but it would still proceed through the normal mechanism. So if updates and reads to a record both requested "immediate" consistency, we would not see update latency (except when nodes failed), but at the cost of network round trips harming the latency.
Updates had a third consistency level, too, which was meant for creating new records with an external primary key (such as a user record, keyed by email address): if this was selected, then a two-phase commit process was used. The record creation would be proposed on every reachable node, which would cause that primary key value to be marked as reserved in the database, or the proposal would be rejected if it already existed or was already reserved. If a majority of the nodes in the system were reachable and OK with the update, then it would be actually performed; otherwise, it would be rejected. These were slow, and we tried to avoid them, but some parts of the application required that level of atomic global consensus.
You might be worrying that these asynchronous updates could get lost between a PHP process sending them to the network and at least one node receiving them, but all updates were received by the local node intrinsically; and once an update is on one node, it'll get to all the others. The reliable multicast protocol lets each node know if it's missed some updates from others (by using a per-node monotonic sequence number on every message, and looking for gaps). If that happens, it would try and retransmit if they were still in the send buffers; if not, then a network break would be declared, and the nodes would know they'd missed some updates from some other nodes. This would start a process by which all nodes that had missed some updates would declare the range of global sequence numbers they seemed to be missing, and the nodes that had updates in that range would then re-transmit them all (by using a special index from update sequence number to record across all tables). They would be retransmitted with the original sequence numbers, not new ones, so no matter what order which nodes got them in, they would eventually get to all nodes and end up in the same global ordering, so every node would end up with the same final state.
A node that starts up from a reboot or from fresh used the same process - the knew the last sequence numbers they'd been fully in synch with the cluster at, so they'd be able to request the latest state of all records updated since then. A totally new node would have seen no updates, so would request all records changed since timestamp zero, which would be all records. In these cases, the node would request that the playback be unicast to it alone, as opposed to the network partition case where nodes would cooperate to request broadcasts of all the missed updates to get the whole network back together.
And so we had a system that could (on my 2008-era laptop in a VM) do several thousand random record reads per second from a single thread, while handling bursty asynchronous updates of a thousand or so records per second (many many more if they all happened to fall into a smaller number of record groups!), while nodes came and went from the cluster, and the cluster was occasionally split into arbitrary sub-clusters due to network link failures. When the cluster was working properly, it would provide a completely consistent view of the database; but when the cluster was in a failure state, all nodes could service reads and updates (except the super-consistent unique primary key creation updates, which would all fail if a quorum of nodes couldn't be contacted to agree), but updates could then be delayed. I've not seen a system since that can provide that level of service.