automerge/automerge-wasm/test/helpers/sync.js
2022-01-09 08:05:00 -08:00

481 lines
20 KiB
JavaScript

/**
* Implementation of the data synchronisation protocol that brings a local and a remote document
* into the same state. This is typically used when two nodes have been disconnected for some time,
* and need to exchange any changes that happened while they were disconnected. The two nodes that
* are syncing could be client and server, or server and client, or two peers with symmetric roles.
*
* The protocol is based on this paper: Martin Kleppmann and Heidi Howard. Byzantine Eventual
* Consistency and the Fundamental Limits of Peer-to-Peer Databases. https://arxiv.org/abs/2012.00472
*
* The protocol assumes that every time a node successfully syncs with another node, it remembers
* the current heads (as returned by `Backend.getHeads()`) after the last sync with that node. The
* next time we try to sync with the same node, we start from the assumption that the other node's
* document version is no older than the outcome of the last sync, so we only need to exchange any
* changes that are more recent than the last sync. This assumption may not be true if the other
* node did not correctly persist its state (perhaps it crashed before writing the result of the
* last sync to disk), and we fall back to sending the entire document in this case.
*/
//const Backend = require('./backend')
const Backend = {} //require('./backend')
const { hexStringToBytes, bytesToHexString, Encoder, Decoder } = require('./encoding')
const { decodeChangeMeta } = require('./columnar')
const { copyObject } = require('./common')
const HASH_SIZE = 32 // 256 bits = 32 bytes
const MESSAGE_TYPE_SYNC = 0x42 // first byte of a sync message, for identification
const PEER_STATE_TYPE = 0x43 // first byte of an encoded peer state, for identification
// These constants correspond to a 1% false positive rate. The values can be changed without
// breaking compatibility of the network protocol, since the parameters used for a particular
// Bloom filter are encoded in the wire format.
const BITS_PER_ENTRY = 10, NUM_PROBES = 7
/**
* A Bloom filter implementation that can be serialised to a byte array for transmission
* over a network. The entries that are added are assumed to already be SHA-256 hashes,
* so this implementation does not perform its own hashing.
*/
class BloomFilter {
constructor (arg) {
if (Array.isArray(arg)) {
// arg is an array of SHA256 hashes in hexadecimal encoding
this.numEntries = arg.length
this.numBitsPerEntry = BITS_PER_ENTRY
this.numProbes = NUM_PROBES
this.bits = new Uint8Array(Math.ceil(this.numEntries * this.numBitsPerEntry / 8))
for (let hash of arg) this.addHash(hash)
} else if (arg instanceof Uint8Array) {
if (arg.byteLength === 0) {
this.numEntries = 0
this.numBitsPerEntry = 0
this.numProbes = 0
this.bits = arg
} else {
const decoder = new Decoder(arg)
this.numEntries = decoder.readUint32()
this.numBitsPerEntry = decoder.readUint32()
this.numProbes = decoder.readUint32()
this.bits = decoder.readRawBytes(Math.ceil(this.numEntries * this.numBitsPerEntry / 8))
}
} else {
throw new TypeError('invalid argument')
}
}
/**
* Returns the Bloom filter state, encoded as a byte array.
*/
get bytes() {
if (this.numEntries === 0) return new Uint8Array(0)
const encoder = new Encoder()
encoder.appendUint32(this.numEntries)
encoder.appendUint32(this.numBitsPerEntry)
encoder.appendUint32(this.numProbes)
encoder.appendRawBytes(this.bits)
return encoder.buffer
}
/**
* Given a SHA-256 hash (as hex string), returns an array of probe indexes indicating which bits
* in the Bloom filter need to be tested or set for this particular entry. We do this by
* interpreting the first 12 bytes of the hash as three little-endian 32-bit unsigned integers,
* and then using triple hashing to compute the probe indexes. The algorithm comes from:
*
* Peter C. Dillinger and Panagiotis Manolios. Bloom Filters in Probabilistic Verification.
* 5th International Conference on Formal Methods in Computer-Aided Design (FMCAD), November 2004.
* http://www.ccis.northeastern.edu/home/pete/pub/bloom-filters-verification.pdf
*/
getProbes(hash) {
const hashBytes = hexStringToBytes(hash), modulo = 8 * this.bits.byteLength
if (hashBytes.byteLength !== 32) throw new RangeError(`Not a 256-bit hash: ${hash}`)
// on the next three lines, the right shift means interpret value as unsigned
let x = ((hashBytes[0] | hashBytes[1] << 8 | hashBytes[2] << 16 | hashBytes[3] << 24) >>> 0) % modulo
let y = ((hashBytes[4] | hashBytes[5] << 8 | hashBytes[6] << 16 | hashBytes[7] << 24) >>> 0) % modulo
let z = ((hashBytes[8] | hashBytes[9] << 8 | hashBytes[10] << 16 | hashBytes[11] << 24) >>> 0) % modulo
const probes = [x]
for (let i = 1; i < this.numProbes; i++) {
x = (x + y) % modulo
y = (y + z) % modulo
probes.push(x)
}
return probes
}
/**
* Sets the Bloom filter bits corresponding to a given SHA-256 hash (given as hex string).
*/
addHash(hash) {
for (let probe of this.getProbes(hash)) {
this.bits[probe >>> 3] |= 1 << (probe & 7)
}
}
/**
* Tests whether a given SHA-256 hash (given as hex string) is contained in the Bloom filter.
*/
containsHash(hash) {
if (this.numEntries === 0) return false
for (let probe of this.getProbes(hash)) {
if ((this.bits[probe >>> 3] & (1 << (probe & 7))) === 0) {
return false
}
}
return true
}
}
/**
* Encodes a sorted array of SHA-256 hashes (as hexadecimal strings) into a byte array.
*/
function encodeHashes(encoder, hashes) {
if (!Array.isArray(hashes)) throw new TypeError('hashes must be an array')
encoder.appendUint32(hashes.length)
for (let i = 0; i < hashes.length; i++) {
if (i > 0 && hashes[i - 1] >= hashes[i]) throw new RangeError('hashes must be sorted')
const bytes = hexStringToBytes(hashes[i])
if (bytes.byteLength !== HASH_SIZE) throw new TypeError('heads hashes must be 256 bits')
encoder.appendRawBytes(bytes)
}
}
/**
* Decodes a byte array in the format returned by encodeHashes(), and returns its content as an
* array of hex strings.
*/
function decodeHashes(decoder) {
let length = decoder.readUint32(), hashes = []
for (let i = 0; i < length; i++) {
hashes.push(bytesToHexString(decoder.readRawBytes(HASH_SIZE)))
}
return hashes
}
/**
* Takes a sync message of the form `{heads, need, have, changes}` and encodes it as a byte array for
* transmission.
*/
function encodeSyncMessage(message) {
const encoder = new Encoder()
encoder.appendByte(MESSAGE_TYPE_SYNC)
encodeHashes(encoder, message.heads)
encodeHashes(encoder, message.need)
encoder.appendUint32(message.have.length)
for (let have of message.have) {
encodeHashes(encoder, have.lastSync)
encoder.appendPrefixedBytes(have.bloom)
}
encoder.appendUint32(message.changes.length)
for (let change of message.changes) {
encoder.appendPrefixedBytes(change)
}
return encoder.buffer
}
/**
* Takes a binary-encoded sync message and decodes it into the form `{heads, need, have, changes}`.
*/
function decodeSyncMessage(bytes) {
const decoder = new Decoder(bytes)
const messageType = decoder.readByte()
if (messageType !== MESSAGE_TYPE_SYNC) {
throw new RangeError(`Unexpected message type: ${messageType}`)
}
const heads = decodeHashes(decoder)
const need = decodeHashes(decoder)
const haveCount = decoder.readUint32()
let message = {heads, need, have: [], changes: []}
for (let i = 0; i < haveCount; i++) {
const lastSync = decodeHashes(decoder)
const bloom = decoder.readPrefixedBytes(decoder)
message.have.push({lastSync, bloom})
}
const changeCount = decoder.readUint32()
for (let i = 0; i < changeCount; i++) {
const change = decoder.readPrefixedBytes()
message.changes.push(change)
}
// Ignore any trailing bytes -- they can be used for extensions by future versions of the protocol
return message
}
/**
* Takes a SyncState and encodes as a byte array those parts of the state that should persist across
* an application restart or disconnect and reconnect. The ephemeral parts of the state that should
* be cleared on reconnect are not encoded.
*/
function encodeSyncState(syncState) {
const encoder = new Encoder()
encoder.appendByte(PEER_STATE_TYPE)
encodeHashes(encoder, syncState.sharedHeads)
return encoder.buffer
}
/**
* Takes a persisted peer state as encoded by `encodeSyncState` and decodes it into a SyncState
* object. The parts of the peer state that were not encoded are initialised with default values.
*/
function decodeSyncState(bytes) {
const decoder = new Decoder(bytes)
const recordType = decoder.readByte()
if (recordType !== PEER_STATE_TYPE) {
throw new RangeError(`Unexpected record type: ${recordType}`)
}
const sharedHeads = decodeHashes(decoder)
return Object.assign(initSyncState(), { sharedHeads })
}
/**
* Constructs a Bloom filter containing all changes that are not one of the hashes in
* `lastSync` or its transitive dependencies. In other words, the filter contains those
* changes that have been applied since the version identified by `lastSync`. Returns
* an object of the form `{lastSync, bloom}` as required for the `have` field of a sync
* message.
*/
function makeBloomFilter(backend, lastSync) {
const newChanges = Backend.getChanges(backend, lastSync)
const hashes = newChanges.map(change => decodeChangeMeta(change, true).hash)
return {lastSync, bloom: new BloomFilter(hashes).bytes}
}
/**
* Call this function when a sync message is received from another node. The `message` argument
* needs to already have been decoded using `decodeSyncMessage()`. This function determines the
* changes that we need to send to the other node in response. Returns an array of changes (as
* byte arrays).
*/
function getChangesToSend(backend, have, need) {
if (have.length === 0) {
return need.map(hash => Backend.getChangeByHash(backend, hash)).filter(change => change !== undefined)
}
let lastSyncHashes = {}, bloomFilters = []
for (let h of have) {
for (let hash of h.lastSync) lastSyncHashes[hash] = true
bloomFilters.push(new BloomFilter(h.bloom))
}
// Get all changes that were added since the last sync
const changes = Backend.getChanges(backend, Object.keys(lastSyncHashes))
.map(change => decodeChangeMeta(change, true))
let changeHashes = {}, dependents = {}, hashesToSend = {}
for (let change of changes) {
changeHashes[change.hash] = true
// For each change, make a list of changes that depend on it
for (let dep of change.deps) {
if (!dependents[dep]) dependents[dep] = []
dependents[dep].push(change.hash)
}
// Exclude any change hashes contained in one or more Bloom filters
if (bloomFilters.every(bloom => !bloom.containsHash(change.hash))) {
hashesToSend[change.hash] = true
}
}
// Include any changes that depend on a Bloom-negative change
let stack = Object.keys(hashesToSend)
while (stack.length > 0) {
const hash = stack.pop()
if (dependents[hash]) {
for (let dep of dependents[hash]) {
if (!hashesToSend[dep]) {
hashesToSend[dep] = true
stack.push(dep)
}
}
}
}
// Include any explicitly requested changes
let changesToSend = []
for (let hash of need) {
hashesToSend[hash] = true
if (!changeHashes[hash]) { // Change is not among those returned by getMissingChanges()?
const change = Backend.getChangeByHash(backend, hash)
if (change) changesToSend.push(change)
}
}
// Return changes in the order they were returned by getMissingChanges()
for (let change of changes) {
if (hashesToSend[change.hash]) changesToSend.push(change.change)
}
return changesToSend
}
function initSyncState() {
return {
sharedHeads: [],
lastSentHeads: [],
theirHeads: null,
theirNeed: null,
theirHave: null,
sentHashes: {},
}
}
function compareArrays(a, b) {
return (a.length === b.length) && a.every((v, i) => v === b[i])
}
/**
* Given a backend and what we believe to be the state of our peer, generate a message which tells
* them about we have and includes any changes we believe they need
*/
function generateSyncMessage(backend, syncState) {
if (!backend) {
throw new Error("generateSyncMessage called with no Automerge document")
}
if (!syncState) {
throw new Error("generateSyncMessage requires a syncState, which can be created with initSyncState()")
}
let { sharedHeads, lastSentHeads, theirHeads, theirNeed, theirHave, sentHashes } = syncState
const ourHeads = Backend.getHeads(backend)
// Hashes to explicitly request from the remote peer: any missing dependencies of unapplied
// changes, and any of the remote peer's heads that we don't know about
const ourNeed = Backend.getMissingDeps(backend, theirHeads || [])
// There are two reasons why ourNeed may be nonempty: 1. we might be missing dependencies due to
// Bloom filter false positives; 2. we might be missing heads that the other peer mentioned
// because they (intentionally) only sent us a subset of changes. In case 1, we leave the `have`
// field of the message empty because we just want to fill in the missing dependencies for now.
// In case 2, or if ourNeed is empty, we send a Bloom filter to request any unsent changes.
let ourHave = []
if (!theirHeads || ourNeed.every(hash => theirHeads.includes(hash))) {
ourHave = [makeBloomFilter(backend, sharedHeads)]
}
// Fall back to a full re-sync if the sender's last sync state includes hashes
// that we don't know. This could happen if we crashed after the last sync and
// failed to persist changes that the other node already sent us.
if (theirHave && theirHave.length > 0) {
const lastSync = theirHave[0].lastSync
if (!lastSync.every(hash => Backend.getChangeByHash(backend, hash))) {
// we need to queue them to send us a fresh sync message, the one they sent is uninteligible so we don't know what they need
const resetMsg = {heads: ourHeads, need: [], have: [{ lastSync: [], bloom: new Uint8Array(0) }], changes: []}
return [syncState, encodeSyncMessage(resetMsg)]
}
}
// XXX: we should limit ourselves to only sending a subset of all the messages, probably limited by a total message size
// these changes should ideally be RLE encoded but we haven't implemented that yet.
let changesToSend = Array.isArray(theirHave) && Array.isArray(theirNeed) ? getChangesToSend(backend, theirHave, theirNeed) : []
// If the heads are equal, we're in sync and don't need to do anything further
const headsUnchanged = Array.isArray(lastSentHeads) && compareArrays(ourHeads, lastSentHeads)
const headsEqual = Array.isArray(theirHeads) && compareArrays(ourHeads, theirHeads)
if (headsUnchanged && headsEqual && changesToSend.length === 0) {
// no need to send a sync message if we know we're synced!
return [syncState, null]
}
// TODO: this recomputes the SHA-256 hash of each change; we should restructure this to avoid the
// unnecessary recomputation
changesToSend = changesToSend.filter(change => !sentHashes[decodeChangeMeta(change, true).hash])
// Regular response to a sync message: send any changes that the other node
// doesn't have. We leave the "have" field empty because the previous message
// generated by `syncStart` already indicated what changes we have.
const syncMessage = {heads: ourHeads, have: ourHave, need: ourNeed, changes: changesToSend}
if (changesToSend.length > 0) {
sentHashes = copyObject(sentHashes)
for (const change of changesToSend) {
sentHashes[decodeChangeMeta(change, true).hash] = true
}
}
syncState = Object.assign({}, syncState, {lastSentHeads: ourHeads, sentHashes})
return [syncState, encodeSyncMessage(syncMessage)]
}
/**
* Computes the heads that we share with a peer after we have just received some changes from that
* peer and applied them. This may not be sufficient to bring our heads in sync with the other
* peer's heads, since they may have only sent us a subset of their outstanding changes.
*
* `myOldHeads` are the local heads before the most recent changes were applied, `myNewHeads` are
* the local heads after those changes were applied, and `ourOldSharedHeads` is the previous set of
* shared heads. Applying the changes will have replaced some heads with others, but some heads may
* have remained unchanged (because they are for branches on which no changes have been added). Any
* such unchanged heads remain in the sharedHeads. Any sharedHeads that were replaced by applying
* changes are also replaced as sharedHeads. This is safe because if we received some changes from
* another peer, that means that peer had those changes, and therefore we now both know about them.
*/
function advanceHeads(myOldHeads, myNewHeads, ourOldSharedHeads) {
const newHeads = myNewHeads.filter((head) => !myOldHeads.includes(head))
const commonHeads = ourOldSharedHeads.filter((head) => myNewHeads.includes(head))
const advancedHeads = [...new Set([...newHeads, ...commonHeads])].sort()
return advancedHeads
}
/**
* Given a backend, a message message and the state of our peer, apply any changes, update what
* we believe about the peer, and (if there were applied changes) produce a patch for the frontend
*/
function receiveSyncMessage(backend, oldSyncState, binaryMessage) {
if (!backend) {
throw new Error("generateSyncMessage called with no Automerge document")
}
if (!oldSyncState) {
throw new Error("generateSyncMessage requires a syncState, which can be created with initSyncState()")
}
let { sharedHeads, lastSentHeads, sentHashes } = oldSyncState, patch = null
const message = decodeSyncMessage(binaryMessage)
const beforeHeads = Backend.getHeads(backend)
// If we received changes, we try to apply them to the document. There may still be missing
// dependencies due to Bloom filter false positives, in which case the backend will enqueue the
// changes without applying them. The set of changes may also be incomplete if the sender decided
// to break a large set of changes into chunks.
if (message.changes.length > 0) {
[backend, patch] = Backend.applyChanges(backend, message.changes)
sharedHeads = advanceHeads(beforeHeads, Backend.getHeads(backend), sharedHeads)
}
// If heads are equal, indicate we don't need to send a response message
if (message.changes.length === 0 && compareArrays(message.heads, beforeHeads)) {
lastSentHeads = message.heads
}
// If all of the remote heads are known to us, that means either our heads are equal, or we are
// ahead of the remote peer. In this case, take the remote heads to be our shared heads.
const knownHeads = message.heads.filter(head => Backend.getChangeByHash(backend, head))
if (knownHeads.length === message.heads.length) {
sharedHeads = message.heads
// If the remote peer has lost all its data, reset our state to perform a full resync
if (message.heads.length === 0) {
lastSentHeads = []
sentHashes = []
}
} else {
// If some remote heads are unknown to us, we add all the remote heads we know to
// sharedHeads, but don't remove anything from sharedHeads. This might cause sharedHeads to
// contain some redundant hashes (where one hash is actually a transitive dependency of
// another), but this will be cleared up as soon as we know all the remote heads.
sharedHeads = [...new Set(knownHeads.concat(sharedHeads))].sort()
}
const syncState = {
sharedHeads, // what we have in common to generate an efficient bloom filter
lastSentHeads,
theirHave: message.have, // the information we need to calculate the changes they need
theirHeads: message.heads,
theirNeed: message.need,
sentHashes
}
return [backend, syncState, patch]
}
module.exports = {
receiveSyncMessage, generateSyncMessage,
encodeSyncMessage, decodeSyncMessage,
initSyncState, encodeSyncState, decodeSyncState,
BloomFilter // BloomFilter is a private API, exported only for testing purposes
}