481 lines
20 KiB
JavaScript
481 lines
20 KiB
JavaScript
/**
|
|
* Implementation of the data synchronisation protocol that brings a local and a remote document
|
|
* into the same state. This is typically used when two nodes have been disconnected for some time,
|
|
* and need to exchange any changes that happened while they were disconnected. The two nodes that
|
|
* are syncing could be client and server, or server and client, or two peers with symmetric roles.
|
|
*
|
|
* The protocol is based on this paper: Martin Kleppmann and Heidi Howard. Byzantine Eventual
|
|
* Consistency and the Fundamental Limits of Peer-to-Peer Databases. https://arxiv.org/abs/2012.00472
|
|
*
|
|
* The protocol assumes that every time a node successfully syncs with another node, it remembers
|
|
* the current heads (as returned by `Backend.getHeads()`) after the last sync with that node. The
|
|
* next time we try to sync with the same node, we start from the assumption that the other node's
|
|
* document version is no older than the outcome of the last sync, so we only need to exchange any
|
|
* changes that are more recent than the last sync. This assumption may not be true if the other
|
|
* node did not correctly persist its state (perhaps it crashed before writing the result of the
|
|
* last sync to disk), and we fall back to sending the entire document in this case.
|
|
*/
|
|
|
|
//const Backend = require('./backend')
|
|
const Backend = {} //require('./backend')
|
|
const { hexStringToBytes, bytesToHexString, Encoder, Decoder } = require('./encoding')
|
|
const { decodeChangeMeta } = require('./columnar')
|
|
const { copyObject } = require('./common')
|
|
|
|
const HASH_SIZE = 32 // 256 bits = 32 bytes
|
|
const MESSAGE_TYPE_SYNC = 0x42 // first byte of a sync message, for identification
|
|
const PEER_STATE_TYPE = 0x43 // first byte of an encoded peer state, for identification
|
|
|
|
// These constants correspond to a 1% false positive rate. The values can be changed without
|
|
// breaking compatibility of the network protocol, since the parameters used for a particular
|
|
// Bloom filter are encoded in the wire format.
|
|
const BITS_PER_ENTRY = 10, NUM_PROBES = 7
|
|
|
|
/**
|
|
* A Bloom filter implementation that can be serialised to a byte array for transmission
|
|
* over a network. The entries that are added are assumed to already be SHA-256 hashes,
|
|
* so this implementation does not perform its own hashing.
|
|
*/
|
|
class BloomFilter {
|
|
constructor (arg) {
|
|
if (Array.isArray(arg)) {
|
|
// arg is an array of SHA256 hashes in hexadecimal encoding
|
|
this.numEntries = arg.length
|
|
this.numBitsPerEntry = BITS_PER_ENTRY
|
|
this.numProbes = NUM_PROBES
|
|
this.bits = new Uint8Array(Math.ceil(this.numEntries * this.numBitsPerEntry / 8))
|
|
for (let hash of arg) this.addHash(hash)
|
|
} else if (arg instanceof Uint8Array) {
|
|
if (arg.byteLength === 0) {
|
|
this.numEntries = 0
|
|
this.numBitsPerEntry = 0
|
|
this.numProbes = 0
|
|
this.bits = arg
|
|
} else {
|
|
const decoder = new Decoder(arg)
|
|
this.numEntries = decoder.readUint32()
|
|
this.numBitsPerEntry = decoder.readUint32()
|
|
this.numProbes = decoder.readUint32()
|
|
this.bits = decoder.readRawBytes(Math.ceil(this.numEntries * this.numBitsPerEntry / 8))
|
|
}
|
|
} else {
|
|
throw new TypeError('invalid argument')
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns the Bloom filter state, encoded as a byte array.
|
|
*/
|
|
get bytes() {
|
|
if (this.numEntries === 0) return new Uint8Array(0)
|
|
const encoder = new Encoder()
|
|
encoder.appendUint32(this.numEntries)
|
|
encoder.appendUint32(this.numBitsPerEntry)
|
|
encoder.appendUint32(this.numProbes)
|
|
encoder.appendRawBytes(this.bits)
|
|
return encoder.buffer
|
|
}
|
|
|
|
/**
|
|
* Given a SHA-256 hash (as hex string), returns an array of probe indexes indicating which bits
|
|
* in the Bloom filter need to be tested or set for this particular entry. We do this by
|
|
* interpreting the first 12 bytes of the hash as three little-endian 32-bit unsigned integers,
|
|
* and then using triple hashing to compute the probe indexes. The algorithm comes from:
|
|
*
|
|
* Peter C. Dillinger and Panagiotis Manolios. Bloom Filters in Probabilistic Verification.
|
|
* 5th International Conference on Formal Methods in Computer-Aided Design (FMCAD), November 2004.
|
|
* http://www.ccis.northeastern.edu/home/pete/pub/bloom-filters-verification.pdf
|
|
*/
|
|
getProbes(hash) {
|
|
const hashBytes = hexStringToBytes(hash), modulo = 8 * this.bits.byteLength
|
|
if (hashBytes.byteLength !== 32) throw new RangeError(`Not a 256-bit hash: ${hash}`)
|
|
// on the next three lines, the right shift means interpret value as unsigned
|
|
let x = ((hashBytes[0] | hashBytes[1] << 8 | hashBytes[2] << 16 | hashBytes[3] << 24) >>> 0) % modulo
|
|
let y = ((hashBytes[4] | hashBytes[5] << 8 | hashBytes[6] << 16 | hashBytes[7] << 24) >>> 0) % modulo
|
|
let z = ((hashBytes[8] | hashBytes[9] << 8 | hashBytes[10] << 16 | hashBytes[11] << 24) >>> 0) % modulo
|
|
const probes = [x]
|
|
for (let i = 1; i < this.numProbes; i++) {
|
|
x = (x + y) % modulo
|
|
y = (y + z) % modulo
|
|
probes.push(x)
|
|
}
|
|
return probes
|
|
}
|
|
|
|
/**
|
|
* Sets the Bloom filter bits corresponding to a given SHA-256 hash (given as hex string).
|
|
*/
|
|
addHash(hash) {
|
|
for (let probe of this.getProbes(hash)) {
|
|
this.bits[probe >>> 3] |= 1 << (probe & 7)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Tests whether a given SHA-256 hash (given as hex string) is contained in the Bloom filter.
|
|
*/
|
|
containsHash(hash) {
|
|
if (this.numEntries === 0) return false
|
|
for (let probe of this.getProbes(hash)) {
|
|
if ((this.bits[probe >>> 3] & (1 << (probe & 7))) === 0) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Encodes a sorted array of SHA-256 hashes (as hexadecimal strings) into a byte array.
|
|
*/
|
|
function encodeHashes(encoder, hashes) {
|
|
if (!Array.isArray(hashes)) throw new TypeError('hashes must be an array')
|
|
encoder.appendUint32(hashes.length)
|
|
for (let i = 0; i < hashes.length; i++) {
|
|
if (i > 0 && hashes[i - 1] >= hashes[i]) throw new RangeError('hashes must be sorted')
|
|
const bytes = hexStringToBytes(hashes[i])
|
|
if (bytes.byteLength !== HASH_SIZE) throw new TypeError('heads hashes must be 256 bits')
|
|
encoder.appendRawBytes(bytes)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decodes a byte array in the format returned by encodeHashes(), and returns its content as an
|
|
* array of hex strings.
|
|
*/
|
|
function decodeHashes(decoder) {
|
|
let length = decoder.readUint32(), hashes = []
|
|
for (let i = 0; i < length; i++) {
|
|
hashes.push(bytesToHexString(decoder.readRawBytes(HASH_SIZE)))
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
/**
|
|
* Takes a sync message of the form `{heads, need, have, changes}` and encodes it as a byte array for
|
|
* transmission.
|
|
*/
|
|
function encodeSyncMessage(message) {
|
|
const encoder = new Encoder()
|
|
encoder.appendByte(MESSAGE_TYPE_SYNC)
|
|
encodeHashes(encoder, message.heads)
|
|
encodeHashes(encoder, message.need)
|
|
encoder.appendUint32(message.have.length)
|
|
for (let have of message.have) {
|
|
encodeHashes(encoder, have.lastSync)
|
|
encoder.appendPrefixedBytes(have.bloom)
|
|
}
|
|
encoder.appendUint32(message.changes.length)
|
|
for (let change of message.changes) {
|
|
encoder.appendPrefixedBytes(change)
|
|
}
|
|
return encoder.buffer
|
|
}
|
|
|
|
/**
|
|
* Takes a binary-encoded sync message and decodes it into the form `{heads, need, have, changes}`.
|
|
*/
|
|
function decodeSyncMessage(bytes) {
|
|
const decoder = new Decoder(bytes)
|
|
const messageType = decoder.readByte()
|
|
if (messageType !== MESSAGE_TYPE_SYNC) {
|
|
throw new RangeError(`Unexpected message type: ${messageType}`)
|
|
}
|
|
const heads = decodeHashes(decoder)
|
|
const need = decodeHashes(decoder)
|
|
const haveCount = decoder.readUint32()
|
|
let message = {heads, need, have: [], changes: []}
|
|
for (let i = 0; i < haveCount; i++) {
|
|
const lastSync = decodeHashes(decoder)
|
|
const bloom = decoder.readPrefixedBytes(decoder)
|
|
message.have.push({lastSync, bloom})
|
|
}
|
|
const changeCount = decoder.readUint32()
|
|
for (let i = 0; i < changeCount; i++) {
|
|
const change = decoder.readPrefixedBytes()
|
|
message.changes.push(change)
|
|
}
|
|
// Ignore any trailing bytes -- they can be used for extensions by future versions of the protocol
|
|
return message
|
|
}
|
|
|
|
/**
|
|
* Takes a SyncState and encodes as a byte array those parts of the state that should persist across
|
|
* an application restart or disconnect and reconnect. The ephemeral parts of the state that should
|
|
* be cleared on reconnect are not encoded.
|
|
*/
|
|
function encodeSyncState(syncState) {
|
|
const encoder = new Encoder()
|
|
encoder.appendByte(PEER_STATE_TYPE)
|
|
encodeHashes(encoder, syncState.sharedHeads)
|
|
return encoder.buffer
|
|
}
|
|
|
|
/**
|
|
* Takes a persisted peer state as encoded by `encodeSyncState` and decodes it into a SyncState
|
|
* object. The parts of the peer state that were not encoded are initialised with default values.
|
|
*/
|
|
function decodeSyncState(bytes) {
|
|
const decoder = new Decoder(bytes)
|
|
const recordType = decoder.readByte()
|
|
if (recordType !== PEER_STATE_TYPE) {
|
|
throw new RangeError(`Unexpected record type: ${recordType}`)
|
|
}
|
|
const sharedHeads = decodeHashes(decoder)
|
|
return Object.assign(initSyncState(), { sharedHeads })
|
|
}
|
|
|
|
/**
|
|
* Constructs a Bloom filter containing all changes that are not one of the hashes in
|
|
* `lastSync` or its transitive dependencies. In other words, the filter contains those
|
|
* changes that have been applied since the version identified by `lastSync`. Returns
|
|
* an object of the form `{lastSync, bloom}` as required for the `have` field of a sync
|
|
* message.
|
|
*/
|
|
function makeBloomFilter(backend, lastSync) {
|
|
const newChanges = Backend.getChanges(backend, lastSync)
|
|
const hashes = newChanges.map(change => decodeChangeMeta(change, true).hash)
|
|
return {lastSync, bloom: new BloomFilter(hashes).bytes}
|
|
}
|
|
|
|
/**
|
|
* Call this function when a sync message is received from another node. The `message` argument
|
|
* needs to already have been decoded using `decodeSyncMessage()`. This function determines the
|
|
* changes that we need to send to the other node in response. Returns an array of changes (as
|
|
* byte arrays).
|
|
*/
|
|
function getChangesToSend(backend, have, need) {
|
|
if (have.length === 0) {
|
|
return need.map(hash => Backend.getChangeByHash(backend, hash)).filter(change => change !== undefined)
|
|
}
|
|
|
|
let lastSyncHashes = {}, bloomFilters = []
|
|
for (let h of have) {
|
|
for (let hash of h.lastSync) lastSyncHashes[hash] = true
|
|
bloomFilters.push(new BloomFilter(h.bloom))
|
|
}
|
|
|
|
// Get all changes that were added since the last sync
|
|
const changes = Backend.getChanges(backend, Object.keys(lastSyncHashes))
|
|
.map(change => decodeChangeMeta(change, true))
|
|
|
|
let changeHashes = {}, dependents = {}, hashesToSend = {}
|
|
for (let change of changes) {
|
|
changeHashes[change.hash] = true
|
|
|
|
// For each change, make a list of changes that depend on it
|
|
for (let dep of change.deps) {
|
|
if (!dependents[dep]) dependents[dep] = []
|
|
dependents[dep].push(change.hash)
|
|
}
|
|
|
|
// Exclude any change hashes contained in one or more Bloom filters
|
|
if (bloomFilters.every(bloom => !bloom.containsHash(change.hash))) {
|
|
hashesToSend[change.hash] = true
|
|
}
|
|
}
|
|
|
|
// Include any changes that depend on a Bloom-negative change
|
|
let stack = Object.keys(hashesToSend)
|
|
while (stack.length > 0) {
|
|
const hash = stack.pop()
|
|
if (dependents[hash]) {
|
|
for (let dep of dependents[hash]) {
|
|
if (!hashesToSend[dep]) {
|
|
hashesToSend[dep] = true
|
|
stack.push(dep)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Include any explicitly requested changes
|
|
let changesToSend = []
|
|
for (let hash of need) {
|
|
hashesToSend[hash] = true
|
|
if (!changeHashes[hash]) { // Change is not among those returned by getMissingChanges()?
|
|
const change = Backend.getChangeByHash(backend, hash)
|
|
if (change) changesToSend.push(change)
|
|
}
|
|
}
|
|
|
|
// Return changes in the order they were returned by getMissingChanges()
|
|
for (let change of changes) {
|
|
if (hashesToSend[change.hash]) changesToSend.push(change.change)
|
|
}
|
|
return changesToSend
|
|
}
|
|
|
|
function initSyncState() {
|
|
return {
|
|
sharedHeads: [],
|
|
lastSentHeads: [],
|
|
theirHeads: null,
|
|
theirNeed: null,
|
|
theirHave: null,
|
|
sentHashes: {},
|
|
}
|
|
}
|
|
|
|
function compareArrays(a, b) {
|
|
return (a.length === b.length) && a.every((v, i) => v === b[i])
|
|
}
|
|
|
|
/**
|
|
* Given a backend and what we believe to be the state of our peer, generate a message which tells
|
|
* them about we have and includes any changes we believe they need
|
|
*/
|
|
function generateSyncMessage(backend, syncState) {
|
|
if (!backend) {
|
|
throw new Error("generateSyncMessage called with no Automerge document")
|
|
}
|
|
if (!syncState) {
|
|
throw new Error("generateSyncMessage requires a syncState, which can be created with initSyncState()")
|
|
}
|
|
|
|
let { sharedHeads, lastSentHeads, theirHeads, theirNeed, theirHave, sentHashes } = syncState
|
|
const ourHeads = Backend.getHeads(backend)
|
|
|
|
// Hashes to explicitly request from the remote peer: any missing dependencies of unapplied
|
|
// changes, and any of the remote peer's heads that we don't know about
|
|
const ourNeed = Backend.getMissingDeps(backend, theirHeads || [])
|
|
|
|
// There are two reasons why ourNeed may be nonempty: 1. we might be missing dependencies due to
|
|
// Bloom filter false positives; 2. we might be missing heads that the other peer mentioned
|
|
// because they (intentionally) only sent us a subset of changes. In case 1, we leave the `have`
|
|
// field of the message empty because we just want to fill in the missing dependencies for now.
|
|
// In case 2, or if ourNeed is empty, we send a Bloom filter to request any unsent changes.
|
|
let ourHave = []
|
|
if (!theirHeads || ourNeed.every(hash => theirHeads.includes(hash))) {
|
|
ourHave = [makeBloomFilter(backend, sharedHeads)]
|
|
}
|
|
|
|
// Fall back to a full re-sync if the sender's last sync state includes hashes
|
|
// that we don't know. This could happen if we crashed after the last sync and
|
|
// failed to persist changes that the other node already sent us.
|
|
if (theirHave && theirHave.length > 0) {
|
|
const lastSync = theirHave[0].lastSync
|
|
if (!lastSync.every(hash => Backend.getChangeByHash(backend, hash))) {
|
|
// we need to queue them to send us a fresh sync message, the one they sent is uninteligible so we don't know what they need
|
|
const resetMsg = {heads: ourHeads, need: [], have: [{ lastSync: [], bloom: new Uint8Array(0) }], changes: []}
|
|
return [syncState, encodeSyncMessage(resetMsg)]
|
|
}
|
|
}
|
|
|
|
// XXX: we should limit ourselves to only sending a subset of all the messages, probably limited by a total message size
|
|
// these changes should ideally be RLE encoded but we haven't implemented that yet.
|
|
let changesToSend = Array.isArray(theirHave) && Array.isArray(theirNeed) ? getChangesToSend(backend, theirHave, theirNeed) : []
|
|
|
|
// If the heads are equal, we're in sync and don't need to do anything further
|
|
const headsUnchanged = Array.isArray(lastSentHeads) && compareArrays(ourHeads, lastSentHeads)
|
|
const headsEqual = Array.isArray(theirHeads) && compareArrays(ourHeads, theirHeads)
|
|
if (headsUnchanged && headsEqual && changesToSend.length === 0) {
|
|
// no need to send a sync message if we know we're synced!
|
|
return [syncState, null]
|
|
}
|
|
|
|
// TODO: this recomputes the SHA-256 hash of each change; we should restructure this to avoid the
|
|
// unnecessary recomputation
|
|
changesToSend = changesToSend.filter(change => !sentHashes[decodeChangeMeta(change, true).hash])
|
|
|
|
// Regular response to a sync message: send any changes that the other node
|
|
// doesn't have. We leave the "have" field empty because the previous message
|
|
// generated by `syncStart` already indicated what changes we have.
|
|
const syncMessage = {heads: ourHeads, have: ourHave, need: ourNeed, changes: changesToSend}
|
|
if (changesToSend.length > 0) {
|
|
sentHashes = copyObject(sentHashes)
|
|
for (const change of changesToSend) {
|
|
sentHashes[decodeChangeMeta(change, true).hash] = true
|
|
}
|
|
}
|
|
|
|
syncState = Object.assign({}, syncState, {lastSentHeads: ourHeads, sentHashes})
|
|
return [syncState, encodeSyncMessage(syncMessage)]
|
|
}
|
|
|
|
/**
|
|
* Computes the heads that we share with a peer after we have just received some changes from that
|
|
* peer and applied them. This may not be sufficient to bring our heads in sync with the other
|
|
* peer's heads, since they may have only sent us a subset of their outstanding changes.
|
|
*
|
|
* `myOldHeads` are the local heads before the most recent changes were applied, `myNewHeads` are
|
|
* the local heads after those changes were applied, and `ourOldSharedHeads` is the previous set of
|
|
* shared heads. Applying the changes will have replaced some heads with others, but some heads may
|
|
* have remained unchanged (because they are for branches on which no changes have been added). Any
|
|
* such unchanged heads remain in the sharedHeads. Any sharedHeads that were replaced by applying
|
|
* changes are also replaced as sharedHeads. This is safe because if we received some changes from
|
|
* another peer, that means that peer had those changes, and therefore we now both know about them.
|
|
*/
|
|
function advanceHeads(myOldHeads, myNewHeads, ourOldSharedHeads) {
|
|
const newHeads = myNewHeads.filter((head) => !myOldHeads.includes(head))
|
|
const commonHeads = ourOldSharedHeads.filter((head) => myNewHeads.includes(head))
|
|
const advancedHeads = [...new Set([...newHeads, ...commonHeads])].sort()
|
|
return advancedHeads
|
|
}
|
|
|
|
|
|
/**
|
|
* Given a backend, a message message and the state of our peer, apply any changes, update what
|
|
* we believe about the peer, and (if there were applied changes) produce a patch for the frontend
|
|
*/
|
|
function receiveSyncMessage(backend, oldSyncState, binaryMessage) {
|
|
if (!backend) {
|
|
throw new Error("generateSyncMessage called with no Automerge document")
|
|
}
|
|
if (!oldSyncState) {
|
|
throw new Error("generateSyncMessage requires a syncState, which can be created with initSyncState()")
|
|
}
|
|
|
|
let { sharedHeads, lastSentHeads, sentHashes } = oldSyncState, patch = null
|
|
const message = decodeSyncMessage(binaryMessage)
|
|
const beforeHeads = Backend.getHeads(backend)
|
|
|
|
// If we received changes, we try to apply them to the document. There may still be missing
|
|
// dependencies due to Bloom filter false positives, in which case the backend will enqueue the
|
|
// changes without applying them. The set of changes may also be incomplete if the sender decided
|
|
// to break a large set of changes into chunks.
|
|
if (message.changes.length > 0) {
|
|
[backend, patch] = Backend.applyChanges(backend, message.changes)
|
|
sharedHeads = advanceHeads(beforeHeads, Backend.getHeads(backend), sharedHeads)
|
|
}
|
|
|
|
// If heads are equal, indicate we don't need to send a response message
|
|
if (message.changes.length === 0 && compareArrays(message.heads, beforeHeads)) {
|
|
lastSentHeads = message.heads
|
|
}
|
|
|
|
// If all of the remote heads are known to us, that means either our heads are equal, or we are
|
|
// ahead of the remote peer. In this case, take the remote heads to be our shared heads.
|
|
const knownHeads = message.heads.filter(head => Backend.getChangeByHash(backend, head))
|
|
if (knownHeads.length === message.heads.length) {
|
|
sharedHeads = message.heads
|
|
// If the remote peer has lost all its data, reset our state to perform a full resync
|
|
if (message.heads.length === 0) {
|
|
lastSentHeads = []
|
|
sentHashes = []
|
|
}
|
|
} else {
|
|
// If some remote heads are unknown to us, we add all the remote heads we know to
|
|
// sharedHeads, but don't remove anything from sharedHeads. This might cause sharedHeads to
|
|
// contain some redundant hashes (where one hash is actually a transitive dependency of
|
|
// another), but this will be cleared up as soon as we know all the remote heads.
|
|
sharedHeads = [...new Set(knownHeads.concat(sharedHeads))].sort()
|
|
}
|
|
|
|
const syncState = {
|
|
sharedHeads, // what we have in common to generate an efficient bloom filter
|
|
lastSentHeads,
|
|
theirHave: message.have, // the information we need to calculate the changes they need
|
|
theirHeads: message.heads,
|
|
theirNeed: message.need,
|
|
sentHashes
|
|
}
|
|
return [backend, syncState, patch]
|
|
}
|
|
|
|
module.exports = {
|
|
receiveSyncMessage, generateSyncMessage,
|
|
encodeSyncMessage, decodeSyncMessage,
|
|
initSyncState, encodeSyncState, decodeSyncState,
|
|
BloomFilter // BloomFilter is a private API, exported only for testing purposes
|
|
}
|