# Design Data Intensive Applications

# Reliability, Scalability and Maintainability

# Reliability

  • The application performs the function as user expected
  • It can tolerate user mistakes or using the software in unexpected ways
  • The system prevents unauthorized access or abuse

There are many kinds of faults including:

  • Hardware Faults - hardware redundancy can solve the problem with RAID disk config, dual power supplies, and hot-swappable CPUs. But in applications with large numbers of machines, faults can be the loss of entire machine. So the system should tolerate the loss of entire machines.

  • Software Errors - bug, OOM, slow down, cascading failure.

  • Human Errors - thorough test, easy recovery, monitoring, etc. These can help mitigate or prevent human errors.

# Scalability

  • Loads - how many r/w per second is needed.
  • Performance - when load increases
    • if resources are the same, what performance is expected
    • how much resources to add if want to keep performnace unchanged
    • For batch system such as hadoop, we care more about throughput - then number of records we can process per second.
    • For online system, the response time is more important.
    • Performance test should have client indenpendently sending request to the server. Waiting on the response to send will skew the measurement.

TIP

Latency VS response time Latency is how long the request waited to be handled. Response time is the total time client sees to get the response back.

# Maintainability

  • Operability

    • monitor health and quick recovery
    • capacity planning
    • deployment best practices
    • knowledge preserverance
  • Simplicity

    • Use abstraction - higher level programming languages, SQL, design patterns
  • Evolvability

    • Agile, TDD

# Data Models and Query Languages

# Relational Model vs Document Model

Relational Model : data is organized into relations (i.e. tables) where each relation is an unordered collection of tuples (i.e. rows)

Document model : storing nested records within their parent record rather than a separate table.

When it comes to representing many-to-one and many-to-many relationship, relational database references a foreign key but document database references a document reference.

If the data has a document like structure, e.g. a tree of one-to-many relashionships, where typically the entire tree is loaded at once, it's good to use document model.

But it's not easy to reference a nested item within a document. This is not necessarily a problem as long as the document is not too deeply nested.

It's not very appealing when there are a lot of many-to-many relashionships. It's possible to reduce the need for joins by denormalizing. But then additional effort is needed to keep denormalized data consistent.

Document databases are not schemaless, they are just schema-on-read. Document is easier to change schema. For RDBMS changing schema means recreate the whole table and need downtime when altering a large table (there are tools to get around it though).

Document database also stores a document as a single continuous string, encoded as JSON, XML or a binary variant (such as MongoDB's BSON). If the application often needs to access the entire document, there's an advantage of "storage locality". Storage locality can also be achived in RDBMS, Spanner allows the schema to declare a table's rows should be nested within a parent table.

Relational and Document databases are merging as JSON is supported in relational database and joins are supported on the client side of the document database (a bit less performant).

# Imperative vs Declarative query

Imperative is like selecting using python. You have to peek inside the storage and decide how the selection was done. Declarative is SQL, you just specify what you want to select and rely on the query engine to decide how selection was done.

# Graph Database

Graph can be stored as vertices and edges. To query a graph, the number of joins is not fixed. In order to find a vertex that is a downstream of another vertex, multiple joins (non-deterministic) have to be performed.

# Storage and Retrieval

# Data structure to store data

To implement a k-v store:

db-set() {
  echo "$1,$2" >> database
}

db-get () {
  grep "^$1," database | sed -e "s/^$1,//" | tail -n 1
}

This simple implementation is efficient in write, as it keeps appending (called a log). But reading is terrible (o(n)). But indexing slows down write. So there's a trade-off.

To add indexing to the above example, just maintain an in-memory hashmap where every key is mapped to the byte offset in the data file.

Since this is a log, it grows forever. What we can do is segmentation - once the log reaches a certain size, we create a new segment for new logs. Then we can do compaction (i.e. dedupe) for existing segments, not only within one segment but also across multiple segments. Once a new merged segment is created, old segments can be deleted.

Each segment now has its own hashmap and we should try to keep the number of segments small. When retrieving values, we try to look at the most recent segment. If key is not found in the most recent segment, we look up the next most recent segment.

A few tricks

  • Format - Binary format is faster and simpler than CSV. It first encode the length of a string, followed by the raw string without need for escaping.
  • Delete record - just append a special deletion record to the data file. Sometimes this is called a tombstone.
  • Crash recovery - when the database is restarted, in-memory hashmap can be slow to be recreated. In that case, we can save compacted segment hashmaps in disk and load them into memory.
  • Partially written records - if database crash at halfway through appending a record, a checksums mechanism can be introduced to allow corrupted parts being detected and ignored.
  • Concurrency control - there's only one writer thread. But can be read by multiple threads concurrently.

Benefits of append-only log

  • Appending is faster than random writes - especially on magnetic spinning-disk drives. Some flash based SSDs also prefere sequential writes.
  • Concurrency and crash recovery are easier since older records are always available.

Cons:

  • Hashmap has to fit in memory - on disk hashmap requires a lot of random access I/O, which is expensive.
  • Range queries are not efficient. You have to look up each key in the range separately.

# SSTable and LSM-Trees

We can sort the keys in each of the segment files.

  • When merging two files, we don't have to fit all files in memory. Just do like merge sort.
  • Since the keys are sorted, we don't have to store all keys in memory. We can store sparsely some keys and do look up by scanning through a range of keys.

Implementation

  1. Keep writing to a in-memory balanced search tree data structure, e.g. a red-black tree. This tree is called a memtable.
  2. Once the memtable is grow to a certain size, write it to a SSTable. An create a new empty memtable.
  3. To serve a read request, look up the in memtable, then the most recent on-disk segment...
  4. From time to time, run merging and compaction process in background to combine segment files. And discard deleted values.

How to deal with the problem of crashing? Just keep writing an append-only segment file along with the memtable.

LSM-tree stands for Log-Structured Merge-Tree. And the algorithm described here is to build the LSM-tree.

Performance optimizations

  • When looking up a key that's not existing, we have to go over all the segments which is very costly. We can use a bloom filter to tell if a key does not exist.
  • To pick the time when background merging and compact happens, we cna use either size-tiered or leveled. In size-tiered strategy, new and smaller SSTables are merged into older and larger SSTables. Whereas in leveled strategy, the key range is split into smaller SSTables (called levels). New SSTables are merged to all of existing SSTables.

# B-tree

The log-structured indexes is not the most common type. The most common one is B-tree.

A good video explaining B-tree.

  • B-tree has the same depth for each leaf.
  • Each node can have no more than M-1 children where M is the degree or branching factor.
  • Each node has an ordered list of keys
  • Each children is inserted between two keys.
  • Upon insertion, if a node has more keys (degreee) than allowed, it will split to two parts and the middle key will be promoted to the parent. If root is splitted, a new root will be created.

B-Tree is stored on disk so it reads one node with one disk I/O.

When storing a database. B-Tree is an in-disk index system. The root page stores highest level of range (similar to dns). Then it goes down one level to tell you a finer range. The leaf page stores the key and the value. If there's not enough space on a leaf, it will get split into two pages. One page is 4k in size. Most database can fit into B-tree with three or four levels deep. A 4 level tree of 4KB pages with a branching factor of 500 can store up to 256TB.

When updating the B-tree on disk, if the database crashes, data will get lost. To make the database resilient to crashes, it's common for B-tree to implement an additional "write-ahead log" (WAL). Which is an append-only file to which every B-tree modification must be written before it's applied ot the pages.

Optimizations

  • Instead of maintaining WAL, always make a copy of the page to be modified.
  • Only store subrange of trees if the full range can be inferred also from parent page.
  • Try to put leaf pages in sequential order on disk. But this may not be true after the B-tree's been modified a lot.
  • Add reference to sibling pages on leaf to avoid going back to their parents.

# Comparing B-Tree with LSM-Tree

  • LSM is faster for writes, while B-tree is faster for reads.
  • Both LSM and BTree writes to disk multiple times for one modification. This is called write amplification. This is partiticular concern on SSDs, which can only overwite blocks a limited number of times before wearing out.
  • LSM compresses better so they produce smaller files on disk. Some of B-tree's disk space is unused due to fragmentation.
  • LSM reads and writes sometimes have to wait for expensive compression process.
  • If write throughput is high, the compaction may not be able to keep up and run out of diskspace.
  • B-tree has one reference to one key while LSM can have multiple. This makes B-tree easier to attach transaction locks to the keys.

# Other indexing structures

Value storage can have two options - either store together with the key (clustered) or store in a heap file and store the reference to the heap file with the key. Storing with the key speeds up reads but require additional storage and can add overhead on writes. Storing in heap file adds overhead in reads. It is also complicated sometimes when the updated value is oversized and a new heap file has to be generated and reference has to be updated.

# In memory

  • When database is not large, it can be just stored in memory. Or be distributed on multiple machines.
  • The in memory DB can use battery powered RAM to avoid power failure. It also write all transactions to disk so that data can persist.
  • Performance gains are not from faster memory access since disk contents are also cached in memory. The gain is from avoiding formatting memory contents and write them to disk.
  • In memory implementation of algorithm is easier comparing to on disk.

# OLAP (online analytic processing) VS OLTP (online transactions processing)

OLAP means that the database is used for a lot of data analysis - filter, aggregation, etc. The goal is different from taking transactions, e.g. add a comment, register a new user, etc. OLAP is usually done in a separated database called data warehouse. The process of moving data from OLTP to OLAP is called ETL (Extract, transform, load)

# Star schema and snowflake schema

Start schema means that the fact table is in the middle, with many dim tables around. Fact table can be a list of transactions. If dim table references more dim tables, it's called snowflake schema.

# Column oriented storage

If a table has hundreds of columns and usually only a few of them are read. It doens't make sense to read all the columns of wanted rows. So we can store the database in columns. Each column is stored in a file.

# Materialized view

A.k.a. standard (virtual) view. Do some aggregation (MIN, MAX, AVG) before hand and store them in the warehouse. But it's not as flexible as querying the raw data.

# Encoding and evolution

# Formats of encoding data

Data in memory has different format comparing to on-disk and in network.

  • In memory data can be objects, list, hashmap, tree, etc. They are optimized for CPU operations.
  • When transferring over the network or write to a file, the data format is sequential. e.g. JSON / byte sequence.

Data format transform can be taken with language specific libraries, kryo for java, pickle for Python.

  • The translation make it impossible to encode and decode with different languages.
  • The translation requires exposing the class definition upon decoding.
  • Versioning can introduce compatibility problems
  • Efficiency is bad, specially Java.

JSON, XML and CSV can be good ways to encode.

  • XML and CSV doesn't distinguish number from string. JSON doesn't distinguish int from float.
  • JSON and XML does not support byte string.
  • CSV has no schema.

JSON can be encoded into binary format to save space.

# Dataflows

There are three ways of data flows

  • Via database
  • Via RPC calls
  • Via async messaging

Benefits of using messaging comparing to RPC

  • Works as a buffer
  • Auto redelivery
  • Hides the address of the recipient, and decouple the sender and reciever
  • Allow one message sent to multiple recipients.

But message sender doesn't necessary expect a response.

Distributed Actor Framework is a model of clients all write to message broker.

# Distributed Data

The shared-memory and shared-disk has problems

  • Shared-memory has non-linear cost for more powerful machines.
  • Twice powerful machine cannot handle twice the load due to bottlenecks.
  • Fault tolerance is not good because single machine is located in one geological place.
  • Shared-disk approach is limited by the network connection.

# Replication

We want to do replication because it can

  • provide better fault tolerance
  • serve with lower latency as data are closer to user
  • serve with more throughput

# Leader and Follower Replication

The process is as following

  • When write happens, client can only send write request to leader. Leader updates the database
  • Leader sends the write log to all replicas and replicas do the write as well.
  • Client can read from all the replicas, including leader and followers

Discussion of sync

  • Sync can guarantee the same data on leader and follower. So the leader can recover from failure. but follower may take a long time to respond. And leader cannot do the next write before all sync writes are done.
  • Usually only one follower is sync and others are async. If that sync follower becomes slow to respond, an async follower becomes sync. This config is called semi-sync.
  • Sometimes all followers are async. Pros is that leader can keep writing without waiting for followers. But upon failure, the system may not be able to get fully recovered.

How to setup a new follower? i.e. data copy. Simple standard file copy doesn't work because data is dynamic.

  1. Take a snapshot of the leader's database at some point of time.
  2. Copy the snapshot to the follower.
  3. Follower gets all the changes (log) since the snapshot.
  4. Follower process the backlog of data and catch up with the leader.

Handling outage

  • For followers, keep a local log of data changes from leader. If follower crashes, it check the last transaction from the log and catch up from there.
  • For leader, confirm the leader has failed, choose a new leader, and reconfigure the system to use the new leader.
  • If async replication is used, the new leader may not have up to date status. This may generate conflicts with the other followers since they may have the up to date data.
  • The timeout to declare dead leader is tricky. If it's long, there's longer time and harder to recover. IF it's short, there are unnecessary fail overs.

Implementation of replication logs

  • A naive way is to just store the SQL statement in the log.
    • non-deterministic functions becomes tricky such as NOW() or RAND()
    • order matters if new value depends on existing values.
  • Use write-ahead logs: write to a log with what to be written to the file storage.
    • The log is exactly what's on file. This is too low level which constrains the leader and followers have the same version of database software.
    • Above makes database software upgrade very hard.
  • A modified version of WAL is to use logical log data representation rather than physical. Then the leader and followers can have different database software versions.
    • Logical log format is also easier for external applications to parse.

Read-after-write consistency means after a user writes to database, they will read what they wrote. This is challenging because user may not be reading the same leader as they wrote. There are a few ways to do it.

  • When reading something that the user may have modified, read from the leader. Otherwise read from follower.
  • Track the lantency of followers and prevent from reading from the follower if user modification has not been propagated.
  • If the replicas is distributed across multiple datacenters, the request have to be served by the leader in the right datacenter.

Monotonic reads means that when a user reads multiple times, the user will not see result going backwards. It can be achieved by always reading from the same replica.

Consistent prefix reads meaning that events are logged chronically in the database. This is harder to guarantee in partitioned database. One solution is to make writes that are causually related to each other written to the same partition.

# Multi-leader replication

Where there are multiple datacenters, it make sense to have multi-leader. Between datacenters, each datacenter's leaders replicates its changes to the leaders in other datacenters.

The benefits of multi-leader replication:

  • Performance wise, write delay is minimized as every write can be processed in the local datacenter and is replicated async to the other datacenters.
  • Fault tolerance is better. In the case of datacenter outage, it is not necessary to promote a new leader in other datacenter since there's a leader in each datacenter.
  • Network problems. Inter-datacenter traffic are through public network which is not necessarily reliable. But async replication between datacenters can tolerate network problems - writes can still be processed upon temporary network interruption.

Downside of multi-leader.

  • Conflict may be written to two leaders - it requires a good conflict resolving strategy.
  • Auto incrementing keys, triggers can be potential problems.

In the case of offline apps, such as calendar, a database is maintained locally (e.g. on your phone) as leader. And it replicates once it's been connected to the network.

Handling conflict

  • Only multi-leader can have conflicts.
  • Try to ensure all the write fora particular records go through the same leader.
  • Possible ways to resolve conflicts
    • Give each write a unique ID and pick the highest ID as truth
    • Give each replica a unique ID and pick the highest ID
    • Try to merge conflicts by concatenating them
    • Preserve the conflicts and ask user to resolve manually

Topology

  • Circular - leader always pass the data to the next leader on the circle.
  • Star - one designated root node receives and forwards data to all other nodes.
  • All-to-all - every node broadcast data to all the other nodes.

Star and circular have single point of failure problem. All-to-all has the problem of indeterministic order if two leaders are updated at the same time. Consistent prefix reads can be a fix for this issue.

# Leaderless replication

Leaderless replication means that there's no leader and client writes to a few replicas. This was used in Amazon dynamo DB and sometimes called dynamo-style.

When writing happens when a node is failover:

  • Suppose there are three replicas and one is offline
  • Client writes will be sent to all three replicas but only two will respond with ok.
  • When the failed one is online, clients will read stale data from it.
  • To solve the problem, data are versioned and client reads will read from multiple replicas and take the most recent one.
  • When stale data is detected from a replica, a write will be performed to sync up the data.
  • There can also be background process to constantly check and update missing data in all replicas.

How to guarantee that the correct data is read? Use quarum for read and write.

  • If a DB has N replicas, make sure write request is sent to all of the replicas and expect at least N of them have responded OK.
  • When reading, make sure read from M replicas such that there must be one replica with the latest data.

With the quorum method,

  • with w + r < n, stale data can possibly be read but the latency will be much better.
  • with w + r > n, stale data is still possible
    • sloppy quorum
    • concurrent writes happenend and the stale value won.
    • read and write had a race and read happened before write was fully successful (maybe the write was async)
    • the replicas with successful write failed over.

Sloppy quorum

  • In the case of network failure, some replica nodes are not reachable.
  • When write happens, it can never reach quorum so we lower the bar to allow writing to less nodes than required by quorum.
  • This increases the availability, but introduce risks of stale reading.

Resolve conflict for eventual consistency, i.e. all the replicas have the save data.

  • Last write wins (LWW) - every write attach a timestamp with it and the one with latest timestamp wins. This solution is not durable as some writes may get lost.

How to detect concurrent writes? With the help of versioning, database will know what the client see as a current value, so that the database can determine if the write is an overwrite or a concurrent write.

  • Concurrent writes means that two writes are independent of each other and does not know each other. If on write depends on another, it's not concurrent. They have causual relationship.
  • To determine concurrent writes
    • Every time write happens, server will send a version number to the client with what server think is the current state back to the client
    • Next time client makes a write, it will also send server the highest version number it knows
    • On write, server will only update the keys with version number lower than the received version number (i.e. the highest version number from client), but it will keep higher version keys untouched, because this client doesn't know those changes.
    • The above method ensures no data has been dropped but an additional merging effort should be paid by the client.
    • Merging siblings (concurrent writes) with the naive LWW algorithm may cause data lost. Some smarter way may be taking the union.
      • For removing item, union works if you put a tombstone on an item after deletion.
    • If multiple replicas are used, instead of just one version number, each replica has their own version number and a version vector is used to determine concurrent writes or overwrites.

# Partitioning

Partitioning, a.k.a sharding.

# Partitioning of k-v data

To evenly distribute the data and avoid hotspot,

  • Can use hash key to decide the partition range. But this will break sequantial keys and make range query being sent to all the partitions.
  • Above problem has a mitigation that we have two keys, one partitioned with hash, one just original value, e.g. (user_id, timestamp_of_post). Then we can do ranged query for the posts by a specific user.

What if one key is super hot?

  • Split the key into multiple subkeys, e.g. append two digit int to the end of the key. So write can go evenly to multiple partitions
  • This will require read to read from multiple partitions.

# Partition secondary index

For documents, the first index is the document ID and the database is partitioned by the ID, e.g. 0-499 in one partition, 500-999 in another partition. Then inside each partition, another index is made with other information, e.g. color. Such secondary index is called local index. For example, a car can have first index of VIN, and second index being color, and color:red will have a bunch of IDs as a list, and anotehr secondary index such as make:toyoto and have an arry of IDs for it too.

The local indexing will make a read such as "all cars in color red" spread over a bunch of partitions. A way to fix such issue is "partition by term". Store all the document IDs of color red in partition 0, and all the IDs of color yellow in partition 1. This will cause one document write access multiple partititons. But read will be easier (you still have to do document read from multiple partitions). Updating a document requires updating index in multiple partitions and sometime cannot guarantee consistency.

# Rebalancing

Rebalancing happens when

  • One machine fails
  • A partition needs more resources as throughput/load increases
  • A partition needs more storage

Rebalancing requires

  • Data are not moved unless necessary
  • New R/W loads are balanced

Rebalancing strategies

  • Hash mod N, this moves more data than necessary - since everytime the number of partition changes, data in all the partitions are likely to be moved.
  • Fixed number of partitions - having a bunch of subpartitions in each partition. When rebalancing happens, each partition copies some of the subpartitions to the new partition, write still goes into the old paritition until the rebalancing finishes.

Dynamic partitioning

  • With key ranged parititioning, if one partition grows too large, it just splits into to smaller partitions
  • If a partition is too small, it just merges with adjacent partition.
  • This can make number of partitions grow with the volume of data.

TIP

For both fixed number of partitions and dynamic partitioning, the size of each partition is proportional to the size of the dataset. And it's independent from the number of nodes. Each node can have multiple parititions.

Another rebalancing strategy is having fixed number of partitions per node. This will cause partition size growing with the dataset size. Cassandra uses this strategy. When a new node is added, each node will have some partitions selected, and those partitions will be split to half and half of those partitions will be handled in the new node.

Manual or auto rebalancing?

  • Auto rebalancing can be faster than manual and less operational work.
  • But sometimes it brings trouble. For example, if an node is overloaded and slow to respond, rebalancing it automatically may cause cascading failure.

# Discovery

There are three ways to discover a partition:

  • Send request to any one node and that node reroute to the correct node
  • Send request to a routing tier
    • There can be a zoo-keeper to work together with the routing tier.
  • Client knows where to send the request

# Transactions

Database should follow the ACID rule, if not, they are called BASE.

  • ACID means atomic, consistent, isolation, and durable. But this is a very vague term and unclear about the implementation. It's more like a marketing term today.
    • Atomicity - if a series of writes failed in the middle, the process can be aborted and all the writes are reverted. Atomicity can be achieved by using a log for crash recovery.
    • Consistency - the application's state is reasonable. e.g. for a bank system, a customer's debits is always balanced.
    • Isolation - concurrent operations won't interfere with each other. e.g. two process writes to the database concurrently. Both writes should be effective. Isolation can be implemented by adding lock on the object.
    • Durable - using write ahead log to prevent crash during writing, frequently taking snapshots of the database. Maintain the same data on multiple nodes.
  • BASE means Basically available, soft state and eventual consistency.

Single-object transaction and multi-object transaction.

  • Single-object transaction meaning update one row / document at a time.
  • Multi-object transaction is needed in the cases of
    • An object references other normalized objects and those objects are updated so references should be updated for consistency.
    • In documents with denormalized data, all the repeated document have to be updated.
    • For database with secondary index, all the references to the indexes have to be updated once the object is updated.

Handling errors and abortions. Aborted transactions should be retried as a simple and effective error handling mechanism. But it isn't perfect.

  • If the transaction succeed but the server ack message failed. Then the transaction can be done twice.
  • If error is due to overload, then retrying will make the problem worse.
  • There can be unwanted side effect, e.g. sending email multiple times is annoying.
  • Client may fail during retrying.

# Weak isolation levels

Concurrency happens when two clients try to access the same piece of data at the same time. Databases have tried to hide concurrency issues from applicatioon developers by providing transaction isolation. Isolation should help by letting you pretend that no concurrency is happening: serializable isolation menas that the database guarantees that transactions have the same effect as if they ran serially. But serializable isolation has a performance cost and many datrabases don't want to pay that price. It's therefore common for systems to use weaker levels of isolation, whicch protect agaist some conccurrency issues.

There are multiple ways of weak isolation:

  • Read committed
    • when reading or writing to the database, only see / overwrite data that has been committed. The uncommitted state is called 'dirty'
    • No dirty reads guarantee that a transaction won't see an update that's in progress. Paritially updated state is confusing to users and may cause other transactions make the wrong decision.
    • Partially updated data may get rolled back if tx fails. So dirty reads may see data that's never actually committed.

Implementation

  • Row level locks.
    • During write, process hold a lock on the row to prevent other process modifying the row
    • During read, hold a lock too

But long running write can block reading for a long time. So database holds both old and new dirty value during write and all the read reads the old value until change is committed.

Snapshot isolation and repeatable read

With read committed, for transactions moving bank balance from one account to another, there may be one moment that one account has balance decreased but the other doesn't have balance increased. This is a transient state that's called nonrepeatable read or read skew.

The read skew may be unacceptable in the cases of:

  • Backups - database is being backing up while still writing into. Some rows have old value while other rows have new value. The database may not be in a consistent state and recovering from the backup has problem.
  • Analytical queries may read in consistent data.

Snapshot isolation is to solve the above problem by reading every data at one snapshot. To implement snapshot isolation, database keeps different versions of an object. This is called multi-version concurrency control. Each row has a field called created_by and field called deleted_by. For the transaction of decreasing balance from $500 to $400, two rows will be created. One is $500 with deleted by tx 13 and another is $400 created by tx 13.

Reading snapshot follows these rules:

  • At the start of a read tx, database make a list of other tx in progress. The data modified in those tx will not be read.
  • Any writes by aborted tx are not read.
  • Any writes with tx ID greater than the latest finished tx will be ignored.
  • Garbage collection can remove old commits that have a snapshot already

Snapshot isolation is sometimes called serializable or repeatable read.

Lost Updates

Lost updates can happen when two clients try to write to the same object. Specifically, both process reads and update based on the current value of the objects. e.g. increment current value, update a json object by adding a field, etc.

To prevent lost updates, the following methods will work

  • Atomic write, only one process can do read then write. The next process have to read the updated value. It's implemented by adding lock to the object.
  • Explicit lock. Client explicitly tells the database to lock the object for update.
  • Auto-detect lost data and retry updates. Database can allow concurrent read-and-write updates. But at the end of transaction, do a detection to tell if any data has lost. If so, abort the transaction and force it to retry.
  • Compare and set. Some database do a check before update to make sure the value is the same as it was read previously.

Write skews and phantoms:

  • Sometimes consistency is hard to maintain. e.g. two doctors modify their schedule at the same time which will cause no oncall at a certain time. This is indirectly violating consisitency since different objects are being modified.
  • To solve the problem, constraints can be added to multiple objects - one lock added to all oncall doctor's schedules.
  • Double spending, meeting room book, multiplayer gaming are all problems of write skews.

Serial Execution

The simplest way to solve concurrency problems is to remove concurrency. This means that all tranactions are handled in one thread serially. However, this will make database process slow. However, as RAMs are cheaper, more database can fit into RAM which make process faster. And it's possible to have the database run serialized. In order to make transactions even faster, it requires transactions to be batched together. For example, to do the doctor oncall tx, instead of checking availability and updating schedule in two separate tx, we should bundle them into one (check and update if doable). This can largely reduce network overhead and make tx faster. Since it sends the entire code to the database ahead of time. It's also called stored procedure.

Cons of stored procedure:

  • Each database has different language for stored procedure.
  • Debugging database is harder
  • Database is more performance sensitive than application code. A badly written stored procedure can do large damage to the database.

Overcome these cons and pros:

  • Modern stored procedure uses java or lua
  • Single thread is fast
  • The stored procedure can be used for data replication. On replicas, instead of running the statement, run the stored procedure.

In order to boost the performance of single threaded database, we can do partitioning. However, reading/writing data on multiple partitions will introduce coordination overhead and is much slower than single threaded process.

2 Phase Lock (2PL)

2PL means that writing and reading must wait for all reading and writing finishes for an objects. Snapshot isolation is not allowed. It is a serializing technique.

  • Reads should require a shared lock on the object, so multiple reads can happen.
  • Writes should require a exclusive lock on the object. So only one write can happen.
  • Two phases means holding and releasing the lock (then any lock should be called 2 phase...).
  • In the case of deadlock, system just abort one of the process to resolve deadlock.
  • Performance concerns
    • 2PL makes operations serialized so it has unstable latencies - some process can be really slow.
    • If deadlocks happen often, a lot of operations will get retried and efficiency is low.

Predicate lock and index-range lock are the implementations of 2PL.

Predicate Lock

Predicate lock is a lock added to objects that are being queried. For example, if some one is querying calendar events for a time window, no one can modify the event within that time window.

Index-range locks

Predicate locks are not very performant since the algorithm to check matching locks is of linear complexity. Therefore, an improvement is to add locks on ranges of indexes, e.g. locks on time slots like 1:30pm-2pm.

Serializable Snapshot Isolation (SSI)

2PL does not perform well and serial execution does not scale well. How to make database both perform well and scale well? We can do serializable snapshot isolation.

  • In short, SSI means do not block tx but check before you commit.
  • Both 2PL and serial execution are pessimistic, in the sense that they assume that something will go wrong if they don't do so. But it's very slow.
  • SSI is implemented by
    • Detecting reads of a stale MVCC object version. Before you commit to the database, are there any version change on your queried data?
    • Detecting writes that affect prior reads. Database record transactions on each indexed object. On the completion of one write tx, it notifies other tx who reads the data.

# Trouble with distributed systems

# Faults and partial failures

Some nodes in the distributed system may have partial or total failures. How to have the system running while some nodes went wrong?

# Unreliable networks

Unreliable network can be

  • lost request
  • request being queued for too long
  • a remote node have failed or stopped responding
  • lost response The usual way of handling network issue is timeout and retry.

Network congestion and queueing

Packet delays on networks is most often due to queueing:

  • several nodes try to send packets to the same destination
  • destination machine is busy processing and does not have CPU cycle

The landline telephone uses circuits to establish stable connection between two clients and the connection has fixed bandwidth. But computer network cannot do this since the traffic is dynamic, e.g. sending a message is very different from making a phone call or uploading a file or sending an email.

# Unreliable clock

In the network, clocks are important to measure duration and record timestamp. Computers can adjust their time according to a group of dedicated servers. This mechanism is called the Network Time Protocol (NTP). Those dedicated servers uses GPS signals for accurate time.

An unreliable clock may cause timestamp disorder and some commits may get lost. Because later commits may have earlier timestamp. In this case, version vectors can help to solve the problem.

Also in the case of same timestamp, a tie breaker (e.g. random number) is needed but may break causality.

Timestamp is actually a confidence interval with a range. For spanner, in order to guarantee the causuality, it waits for the length of the confidence interval before committing a read-write transaction. This is used in spanner and requires to keep the clock uncertainty as small as possible.

Process Pauses

A partition leader (i.e. who accepts write) has to hold a lease from other followers to make sure it's still a leader. The lease has a timeout and the leader has to renew it before it expires. However, there are cases that the leader is busy doing some other work and cannot renew the lease in time. Possible reasons are:

  • Java's garbage collector process
  • suspended virtual environment
  • other process occupies CPU
  • slow and synchronous disk I/O

In this case, a node in a distributed system must assyme that its execution can be paused for a significant length of time at any point.

When writing multi-threaded code on a single machine, we have fairly good tools for making it thread safe: mutexes, semaphores, atomic counters, lock-free data structures, blocking queues, etc. But these tools don't directly translate to distributed systems, because a distributed system has no shared memory - only messages sent over an unreliable network with variable delays, and system may suffer from partial failures, unreliable clocks, and processing pauses.

Response time guarantees

TIP

In embedded systems, real-time means that a system is carefully designed and tested to meet specifc timing guarantees in all circumstances.

Limiting the impact of garbage collection

  • treat GC as a brief planned outage. So no more request is sent to the node.
  • another idea is to use GC only to collect short-lived objects, which are fast to collect and restart the process periodically.

# Knowledge, truth and lies

In distributed system, a node cannot necessarily trust its own judgment of a situation:

  1. if a node can receive, process messages but cannot send messages out, it's determined as dead by other nodes.
  2. if a node can send messages out but it never receives ack back, it's also considered dead and cannot do anything about it.
  3. a node doing long GC and others thought it's dead. But eventually this node comes back alive.

So a node does not have much control on if it's alive or dead. Majority of the nodes will decide if you are alive or dead.

Byzantine fault

The Byzantine problem is that there are bunch of generals but some are traitors and will send confusing messages. But no one knows who's loyal and who's traitor.

Byzantine fault means that some nodes intentionally respond with fake data, maybe because the disk was broken, or they got attacked. Or a bug in the software.