dd3c6d1303
After some discussion with PVH I realise that the repo structure in the last reorg was very rust-centric. In an attempt to put each language on a level footing move the rust code and project files into ./rust
1415 lines
55 KiB
JavaScript
1415 lines
55 KiB
JavaScript
const pako = require('pako')
|
|
const { copyObject, parseOpId, equalBytes } = require('./common')
|
|
const {
|
|
utf8ToString, hexStringToBytes, bytesToHexString,
|
|
Encoder, Decoder, RLEEncoder, RLEDecoder, DeltaEncoder, DeltaDecoder, BooleanEncoder, BooleanDecoder
|
|
} = require('./encoding')
|
|
|
|
// Maybe we should be using the platform's built-in hash implementation?
|
|
// Node has the crypto module: https://nodejs.org/api/crypto.html and browsers have
|
|
// https://developer.mozilla.org/en-US/docs/Web/API/SubtleCrypto/digest
|
|
// However, the WebCrypto API is asynchronous (returns promises), which would
|
|
// force all our APIs to become asynchronous as well, which would be annoying.
|
|
//
|
|
// I think on balance, it's safe enough to use a random library off npm:
|
|
// - We only need one hash function (not a full suite of crypto algorithms);
|
|
// - SHA256 is quite simple and has fairly few opportunities for subtle bugs
|
|
// (compared to asymmetric cryptography anyway);
|
|
// - It does not need a secure source of random bits and does not need to be
|
|
// constant-time;
|
|
// - I have reviewed the source code and it seems pretty reasonable.
|
|
const { Hash } = require('fast-sha256')
|
|
|
|
// These bytes don't mean anything, they were generated randomly
|
|
const MAGIC_BYTES = new Uint8Array([0x85, 0x6f, 0x4a, 0x83])
|
|
|
|
const CHUNK_TYPE_DOCUMENT = 0
|
|
const CHUNK_TYPE_CHANGE = 1
|
|
const CHUNK_TYPE_DEFLATE = 2 // like CHUNK_TYPE_CHANGE but with DEFLATE compression
|
|
|
|
// Minimum number of bytes in a value before we enable DEFLATE compression (there is no point
|
|
// compressing very short values since compression may actually make them bigger)
|
|
const DEFLATE_MIN_SIZE = 256
|
|
|
|
// The least-significant 3 bits of a columnId indicate its datatype
|
|
const COLUMN_TYPE = {
|
|
GROUP_CARD: 0, ACTOR_ID: 1, INT_RLE: 2, INT_DELTA: 3, BOOLEAN: 4,
|
|
STRING_RLE: 5, VALUE_LEN: 6, VALUE_RAW: 7
|
|
}
|
|
|
|
// The 4th-least-significant bit of a columnId is set if the column is DEFLATE-compressed
|
|
const COLUMN_TYPE_DEFLATE = 8
|
|
|
|
// In the values in a column of type VALUE_LEN, the bottom four bits indicate the type of the value,
|
|
// one of the following types in VALUE_TYPE. The higher bits indicate the length of the value in the
|
|
// associated VALUE_RAW column (in bytes).
|
|
const VALUE_TYPE = {
|
|
NULL: 0, FALSE: 1, TRUE: 2, LEB128_UINT: 3, LEB128_INT: 4, IEEE754: 5,
|
|
UTF8: 6, BYTES: 7, COUNTER: 8, TIMESTAMP: 9, MIN_UNKNOWN: 10, MAX_UNKNOWN: 15
|
|
}
|
|
|
|
// make* actions must be at even-numbered indexes in this list
|
|
const ACTIONS = ['makeMap', 'set', 'makeList', 'del', 'makeText', 'inc', 'makeTable', 'link']
|
|
|
|
const OBJECT_TYPE = {makeMap: 'map', makeList: 'list', makeText: 'text', makeTable: 'table'}
|
|
|
|
const COMMON_COLUMNS = [
|
|
{columnName: 'objActor', columnId: 0 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'objCtr', columnId: 0 << 4 | COLUMN_TYPE.INT_RLE},
|
|
{columnName: 'keyActor', columnId: 1 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'keyCtr', columnId: 1 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'keyStr', columnId: 1 << 4 | COLUMN_TYPE.STRING_RLE},
|
|
{columnName: 'idActor', columnId: 2 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'idCtr', columnId: 2 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'insert', columnId: 3 << 4 | COLUMN_TYPE.BOOLEAN},
|
|
{columnName: 'action', columnId: 4 << 4 | COLUMN_TYPE.INT_RLE},
|
|
{columnName: 'valLen', columnId: 5 << 4 | COLUMN_TYPE.VALUE_LEN},
|
|
{columnName: 'valRaw', columnId: 5 << 4 | COLUMN_TYPE.VALUE_RAW},
|
|
{columnName: 'chldActor', columnId: 6 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'chldCtr', columnId: 6 << 4 | COLUMN_TYPE.INT_DELTA}
|
|
]
|
|
|
|
const CHANGE_COLUMNS = COMMON_COLUMNS.concat([
|
|
{columnName: 'predNum', columnId: 7 << 4 | COLUMN_TYPE.GROUP_CARD},
|
|
{columnName: 'predActor', columnId: 7 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'predCtr', columnId: 7 << 4 | COLUMN_TYPE.INT_DELTA}
|
|
])
|
|
|
|
const DOC_OPS_COLUMNS = COMMON_COLUMNS.concat([
|
|
{columnName: 'succNum', columnId: 8 << 4 | COLUMN_TYPE.GROUP_CARD},
|
|
{columnName: 'succActor', columnId: 8 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'succCtr', columnId: 8 << 4 | COLUMN_TYPE.INT_DELTA}
|
|
])
|
|
|
|
const DOCUMENT_COLUMNS = [
|
|
{columnName: 'actor', columnId: 0 << 4 | COLUMN_TYPE.ACTOR_ID},
|
|
{columnName: 'seq', columnId: 0 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'maxOp', columnId: 1 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'time', columnId: 2 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'message', columnId: 3 << 4 | COLUMN_TYPE.STRING_RLE},
|
|
{columnName: 'depsNum', columnId: 4 << 4 | COLUMN_TYPE.GROUP_CARD},
|
|
{columnName: 'depsIndex', columnId: 4 << 4 | COLUMN_TYPE.INT_DELTA},
|
|
{columnName: 'extraLen', columnId: 5 << 4 | COLUMN_TYPE.VALUE_LEN},
|
|
{columnName: 'extraRaw', columnId: 5 << 4 | COLUMN_TYPE.VALUE_RAW}
|
|
]
|
|
|
|
/**
|
|
* Maps an opId of the form {counter: 12345, actorId: 'someActorId'} to the form
|
|
* {counter: 12345, actorNum: 123, actorId: 'someActorId'}, where the actorNum
|
|
* is the index into the `actorIds` array.
|
|
*/
|
|
function actorIdToActorNum(opId, actorIds) {
|
|
if (!opId || !opId.actorId) return opId
|
|
const counter = opId.counter
|
|
const actorNum = actorIds.indexOf(opId.actorId)
|
|
if (actorNum < 0) throw new RangeError('missing actorId') // should not happen
|
|
return {counter, actorNum, actorId: opId.actorId}
|
|
}
|
|
|
|
/**
|
|
* Comparison function to pass to Array.sort(), which compares two opIds in the
|
|
* form produced by `actorIdToActorNum` so that they are sorted in increasing
|
|
* Lamport timestamp order (sorted first by counter, then by actorId).
|
|
*/
|
|
function compareParsedOpIds(id1, id2) {
|
|
if (id1.counter < id2.counter) return -1
|
|
if (id1.counter > id2.counter) return +1
|
|
if (id1.actorId < id2.actorId) return -1
|
|
if (id1.actorId > id2.actorId) return +1
|
|
return 0
|
|
}
|
|
|
|
/**
|
|
* Takes `changes`, an array of changes (represented as JS objects). Returns an
|
|
* object `{changes, actorIds}`, where `changes` is a copy of the argument in
|
|
* which all string opIds have been replaced with `{counter, actorNum}` objects,
|
|
* and where `actorIds` is a lexicographically sorted array of actor IDs occurring
|
|
* in any of the operations. `actorNum` is an index into that array of actorIds.
|
|
* If `single` is true, the actorId of the author of the change is moved to the
|
|
* beginning of the array of actorIds, so that `actorNum` is zero when referencing
|
|
* the author of the change itself. This special-casing is omitted if `single` is
|
|
* false.
|
|
*/
|
|
function parseAllOpIds(changes, single) {
|
|
const actors = {}, newChanges = []
|
|
for (let change of changes) {
|
|
change = copyObject(change)
|
|
actors[change.actor] = true
|
|
change.ops = expandMultiOps(change.ops, change.startOp, change.actor)
|
|
change.ops = change.ops.map(op => {
|
|
op = copyObject(op)
|
|
if (op.obj !== '_root') op.obj = parseOpId(op.obj)
|
|
if (op.elemId && op.elemId !== '_head') op.elemId = parseOpId(op.elemId)
|
|
if (op.child) op.child = parseOpId(op.child)
|
|
if (op.pred) op.pred = op.pred.map(parseOpId)
|
|
if (op.obj.actorId) actors[op.obj.actorId] = true
|
|
if (op.elemId && op.elemId.actorId) actors[op.elemId.actorId] = true
|
|
if (op.child && op.child.actorId) actors[op.child.actorId] = true
|
|
for (let pred of op.pred) actors[pred.actorId] = true
|
|
return op
|
|
})
|
|
newChanges.push(change)
|
|
}
|
|
|
|
let actorIds = Object.keys(actors).sort()
|
|
if (single) {
|
|
actorIds = [changes[0].actor].concat(actorIds.filter(actor => actor !== changes[0].actor))
|
|
}
|
|
for (let change of newChanges) {
|
|
change.actorNum = actorIds.indexOf(change.actor)
|
|
for (let i = 0; i < change.ops.length; i++) {
|
|
let op = change.ops[i]
|
|
op.id = {counter: change.startOp + i, actorNum: change.actorNum, actorId: change.actor}
|
|
op.obj = actorIdToActorNum(op.obj, actorIds)
|
|
op.elemId = actorIdToActorNum(op.elemId, actorIds)
|
|
op.child = actorIdToActorNum(op.child, actorIds)
|
|
op.pred = op.pred.map(pred => actorIdToActorNum(pred, actorIds))
|
|
}
|
|
}
|
|
return {changes: newChanges, actorIds}
|
|
}
|
|
|
|
/**
|
|
* Encodes the `obj` property of operation `op` into the two columns
|
|
* `objActor` and `objCtr`.
|
|
*/
|
|
function encodeObjectId(op, columns) {
|
|
if (op.obj === '_root') {
|
|
columns.objActor.appendValue(null)
|
|
columns.objCtr.appendValue(null)
|
|
} else if (op.obj.actorNum >= 0 && op.obj.counter > 0) {
|
|
columns.objActor.appendValue(op.obj.actorNum)
|
|
columns.objCtr.appendValue(op.obj.counter)
|
|
} else {
|
|
throw new RangeError(`Unexpected objectId reference: ${JSON.stringify(op.obj)}`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Encodes the `key` and `elemId` properties of operation `op` into the three
|
|
* columns `keyActor`, `keyCtr`, and `keyStr`.
|
|
*/
|
|
function encodeOperationKey(op, columns) {
|
|
if (op.key) {
|
|
columns.keyActor.appendValue(null)
|
|
columns.keyCtr.appendValue(null)
|
|
columns.keyStr.appendValue(op.key)
|
|
} else if (op.elemId === '_head' && op.insert) {
|
|
columns.keyActor.appendValue(null)
|
|
columns.keyCtr.appendValue(0)
|
|
columns.keyStr.appendValue(null)
|
|
} else if (op.elemId && op.elemId.actorNum >= 0 && op.elemId.counter > 0) {
|
|
columns.keyActor.appendValue(op.elemId.actorNum)
|
|
columns.keyCtr.appendValue(op.elemId.counter)
|
|
columns.keyStr.appendValue(null)
|
|
} else {
|
|
throw new RangeError(`Unexpected operation key: ${JSON.stringify(op)}`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Encodes the `action` property of operation `op` into the `action` column.
|
|
*/
|
|
function encodeOperationAction(op, columns) {
|
|
const actionCode = ACTIONS.indexOf(op.action)
|
|
if (actionCode >= 0) {
|
|
columns.action.appendValue(actionCode)
|
|
} else if (typeof op.action === 'number') {
|
|
columns.action.appendValue(op.action)
|
|
} else {
|
|
throw new RangeError(`Unexpected operation action: ${op.action}`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Encodes the integer `value` into the two columns `valLen` and `valRaw`,
|
|
* with the datatype tag set to `typeTag`. If `typeTag` is zero, it is set
|
|
* automatically to signed or unsigned depending on the sign of the value.
|
|
* Values with non-zero type tags are always encoded as signed integers.
|
|
*/
|
|
function encodeInteger(value, typeTag, columns) {
|
|
let numBytes
|
|
if (value < 0 || typeTag > 0) {
|
|
numBytes = columns.valRaw.appendInt53(value)
|
|
if (!typeTag) typeTag = VALUE_TYPE.LEB128_INT
|
|
} else {
|
|
numBytes = columns.valRaw.appendUint53(value)
|
|
typeTag = VALUE_TYPE.LEB128_UINT
|
|
}
|
|
columns.valLen.appendValue(numBytes << 4 | typeTag)
|
|
}
|
|
|
|
/**
|
|
* Encodes the `value` property of operation `op` into the two columns
|
|
* `valLen` and `valRaw`.
|
|
*/
|
|
function encodeValue(op, columns) {
|
|
if ((op.action !== 'set' && op.action !== 'inc') || op.value === null) {
|
|
columns.valLen.appendValue(VALUE_TYPE.NULL)
|
|
} else if (op.value === false) {
|
|
columns.valLen.appendValue(VALUE_TYPE.FALSE)
|
|
} else if (op.value === true) {
|
|
columns.valLen.appendValue(VALUE_TYPE.TRUE)
|
|
} else if (typeof op.value === 'string') {
|
|
const numBytes = columns.valRaw.appendRawString(op.value)
|
|
columns.valLen.appendValue(numBytes << 4 | VALUE_TYPE.UTF8)
|
|
} else if (ArrayBuffer.isView(op.value)) {
|
|
const numBytes = columns.valRaw.appendRawBytes(new Uint8Array(op.value.buffer))
|
|
columns.valLen.appendValue(numBytes << 4 | VALUE_TYPE.BYTES)
|
|
} else if (op.datatype === 'counter' && typeof op.value === 'number') {
|
|
encodeInteger(op.value, VALUE_TYPE.COUNTER, columns)
|
|
} else if (op.datatype === 'timestamp' && typeof op.value === 'number') {
|
|
encodeInteger(op.value, VALUE_TYPE.TIMESTAMP, columns)
|
|
} else if (typeof op.datatype === 'number' && op.datatype >= VALUE_TYPE.MIN_UNKNOWN &&
|
|
op.datatype <= VALUE_TYPE.MAX_UNKNOWN && op.value instanceof Uint8Array) {
|
|
const numBytes = columns.valRaw.appendRawBytes(op.value)
|
|
columns.valLen.appendValue(numBytes << 4 | op.datatype)
|
|
} else if (op.datatype) {
|
|
throw new RangeError(`Unknown datatype ${op.datatype} for value ${op.value}`)
|
|
} else if (typeof op.value === 'number') {
|
|
if (Number.isInteger(op.value) && op.value <= Number.MAX_SAFE_INTEGER && op.value >= Number.MIN_SAFE_INTEGER) {
|
|
encodeInteger(op.value, 0, columns)
|
|
} else {
|
|
// Encode number in 32-bit float if this can be done without loss of precision
|
|
const buf32 = new ArrayBuffer(4), view32 = new DataView(buf32)
|
|
view32.setFloat32(0, op.value, true) // true means little-endian
|
|
if (view32.getFloat32(0, true) === op.value) {
|
|
columns.valRaw.appendRawBytes(new Uint8Array(buf32))
|
|
columns.valLen.appendValue(4 << 4 | VALUE_TYPE.IEEE754)
|
|
} else {
|
|
const buf64 = new ArrayBuffer(8), view64 = new DataView(buf64)
|
|
view64.setFloat64(0, op.value, true) // true means little-endian
|
|
columns.valRaw.appendRawBytes(new Uint8Array(buf64))
|
|
columns.valLen.appendValue(8 << 4 | VALUE_TYPE.IEEE754)
|
|
}
|
|
}
|
|
} else {
|
|
throw new RangeError(`Unsupported value in operation: ${op.value}`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Given `sizeTag` (an unsigned integer read from a VALUE_LEN column) and `bytes` (a Uint8Array
|
|
* read from a VALUE_RAW column, with length `sizeTag >> 4`), this function returns an object of the
|
|
* form `{value: value, datatype: datatypeTag}` where `value` is a JavaScript primitive datatype
|
|
* corresponding to the value, and `datatypeTag` is a datatype annotation such as 'counter'.
|
|
*/
|
|
function decodeValue(sizeTag, bytes) {
|
|
if (sizeTag === VALUE_TYPE.NULL) {
|
|
return {value: null}
|
|
} else if (sizeTag === VALUE_TYPE.FALSE) {
|
|
return {value: false}
|
|
} else if (sizeTag === VALUE_TYPE.TRUE) {
|
|
return {value: true}
|
|
} else if (sizeTag % 16 === VALUE_TYPE.UTF8) {
|
|
return {value: utf8ToString(bytes)}
|
|
} else {
|
|
if (sizeTag % 16 === VALUE_TYPE.LEB128_UINT) {
|
|
return {value: new Decoder(bytes).readUint53()}
|
|
} else if (sizeTag % 16 === VALUE_TYPE.LEB128_INT) {
|
|
return {value: new Decoder(bytes).readInt53()}
|
|
} else if (sizeTag % 16 === VALUE_TYPE.IEEE754) {
|
|
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
|
|
if (bytes.byteLength === 4) {
|
|
return {value: view.getFloat32(0, true)} // true means little-endian
|
|
} else if (bytes.byteLength === 8) {
|
|
return {value: view.getFloat64(0, true)}
|
|
} else {
|
|
throw new RangeError(`Invalid length for floating point number: ${bytes.byteLength}`)
|
|
}
|
|
} else if (sizeTag % 16 === VALUE_TYPE.COUNTER) {
|
|
return {value: new Decoder(bytes).readInt53(), datatype: 'counter'}
|
|
} else if (sizeTag % 16 === VALUE_TYPE.TIMESTAMP) {
|
|
return {value: new Decoder(bytes).readInt53(), datatype: 'timestamp'}
|
|
} else {
|
|
return {value: bytes, datatype: sizeTag % 16}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reads one value from the column `columns[colIndex]` and interprets it based
|
|
* on the column type. `actorIds` is a list of actors that appear in the change;
|
|
* `actorIds[0]` is the actorId of the change's author. Mutates the `result`
|
|
* object with the value, and returns the number of columns processed (this is 2
|
|
* in the case of a pair of VALUE_LEN and VALUE_RAW columns, which are processed
|
|
* in one go).
|
|
*/
|
|
function decodeValueColumns(columns, colIndex, actorIds, result) {
|
|
const { columnId, columnName, decoder } = columns[colIndex]
|
|
if (columnId % 8 === COLUMN_TYPE.VALUE_LEN && colIndex + 1 < columns.length &&
|
|
columns[colIndex + 1].columnId === columnId + 1) {
|
|
const sizeTag = decoder.readValue()
|
|
const rawValue = columns[colIndex + 1].decoder.readRawBytes(sizeTag >> 4)
|
|
const { value, datatype } = decodeValue(sizeTag, rawValue)
|
|
result[columnName] = value
|
|
if (datatype) result[columnName + '_datatype'] = datatype
|
|
return 2
|
|
} else if (columnId % 8 === COLUMN_TYPE.ACTOR_ID) {
|
|
const actorNum = decoder.readValue()
|
|
if (actorNum === null) {
|
|
result[columnName] = null
|
|
} else {
|
|
if (!actorIds[actorNum]) throw new RangeError(`No actor index ${actorNum}`)
|
|
result[columnName] = actorIds[actorNum]
|
|
}
|
|
} else {
|
|
result[columnName] = decoder.readValue()
|
|
}
|
|
return 1
|
|
}
|
|
|
|
/**
|
|
* Encodes an array of operations in a set of columns. The operations need to
|
|
* be parsed with `parseAllOpIds()` beforehand. If `forDocument` is true, we use
|
|
* the column structure of a whole document, otherwise we use the column
|
|
* structure for an individual change. Returns an array of `{id, name, encoder}`
|
|
* objects.
|
|
*/
|
|
function encodeOps(ops, forDocument) {
|
|
const columns = {
|
|
objActor : new RLEEncoder('uint'),
|
|
objCtr : new RLEEncoder('uint'),
|
|
keyActor : new RLEEncoder('uint'),
|
|
keyCtr : new DeltaEncoder(),
|
|
keyStr : new RLEEncoder('utf8'),
|
|
insert : new BooleanEncoder(),
|
|
action : new RLEEncoder('uint'),
|
|
valLen : new RLEEncoder('uint'),
|
|
valRaw : new Encoder(),
|
|
chldActor : new RLEEncoder('uint'),
|
|
chldCtr : new DeltaEncoder()
|
|
}
|
|
|
|
if (forDocument) {
|
|
columns.idActor = new RLEEncoder('uint')
|
|
columns.idCtr = new DeltaEncoder()
|
|
columns.succNum = new RLEEncoder('uint')
|
|
columns.succActor = new RLEEncoder('uint')
|
|
columns.succCtr = new DeltaEncoder()
|
|
} else {
|
|
columns.predNum = new RLEEncoder('uint')
|
|
columns.predCtr = new DeltaEncoder()
|
|
columns.predActor = new RLEEncoder('uint')
|
|
}
|
|
|
|
for (let op of ops) {
|
|
encodeObjectId(op, columns)
|
|
encodeOperationKey(op, columns)
|
|
columns.insert.appendValue(!!op.insert)
|
|
encodeOperationAction(op, columns)
|
|
encodeValue(op, columns)
|
|
|
|
if (op.child && op.child.counter) {
|
|
columns.chldActor.appendValue(op.child.actorNum)
|
|
columns.chldCtr.appendValue(op.child.counter)
|
|
} else {
|
|
columns.chldActor.appendValue(null)
|
|
columns.chldCtr.appendValue(null)
|
|
}
|
|
|
|
if (forDocument) {
|
|
columns.idActor.appendValue(op.id.actorNum)
|
|
columns.idCtr.appendValue(op.id.counter)
|
|
columns.succNum.appendValue(op.succ.length)
|
|
op.succ.sort(compareParsedOpIds)
|
|
for (let i = 0; i < op.succ.length; i++) {
|
|
columns.succActor.appendValue(op.succ[i].actorNum)
|
|
columns.succCtr.appendValue(op.succ[i].counter)
|
|
}
|
|
} else {
|
|
columns.predNum.appendValue(op.pred.length)
|
|
op.pred.sort(compareParsedOpIds)
|
|
for (let i = 0; i < op.pred.length; i++) {
|
|
columns.predActor.appendValue(op.pred[i].actorNum)
|
|
columns.predCtr.appendValue(op.pred[i].counter)
|
|
}
|
|
}
|
|
}
|
|
|
|
let columnList = []
|
|
for (let {columnName, columnId} of forDocument ? DOC_OPS_COLUMNS : CHANGE_COLUMNS) {
|
|
if (columns[columnName]) columnList.push({id: columnId, name: columnName, encoder: columns[columnName]})
|
|
}
|
|
return columnList.sort((a, b) => a.id - b.id)
|
|
}
|
|
|
|
function expandMultiOps(ops, startOp, actor) {
|
|
let opNum = startOp
|
|
let expandedOps = []
|
|
for (const op of ops) {
|
|
if (op.action === 'set' && op.values && op.insert) {
|
|
if (op.pred.length !== 0) throw new RangeError('multi-insert pred must be empty')
|
|
let lastElemId = op.elemId
|
|
for (const value of op.values) {
|
|
expandedOps.push({action: 'set', obj: op.obj, elemId: lastElemId, value, pred: [], insert: true})
|
|
lastElemId = `${opNum}@${actor}`
|
|
opNum += 1
|
|
}
|
|
} else if (op.action === 'del' && op.multiOp > 1) {
|
|
if (op.pred.length !== 1) throw new RangeError('multiOp deletion must have exactly one pred')
|
|
const startElemId = parseOpId(op.elemId), startPred = parseOpId(op.pred[0])
|
|
for (let i = 0; i < op.multiOp; i++) {
|
|
const elemId = `${startElemId.counter + i}@${startElemId.actorId}`
|
|
const pred = [`${startPred.counter + i}@${startPred.actorId}`]
|
|
expandedOps.push({action: 'del', obj: op.obj, elemId, pred})
|
|
opNum += 1
|
|
}
|
|
} else {
|
|
expandedOps.push(op)
|
|
opNum += 1
|
|
}
|
|
}
|
|
return expandedOps
|
|
}
|
|
|
|
/**
|
|
* Takes a change as decoded by `decodeColumns`, and changes it into the form
|
|
* expected by the rest of the backend. If `forDocument` is true, we use the op
|
|
* structure of a whole document, otherwise we use the op structure for an
|
|
* individual change.
|
|
*/
|
|
function decodeOps(ops, forDocument) {
|
|
const newOps = []
|
|
for (let op of ops) {
|
|
const obj = (op.objCtr === null) ? '_root' : `${op.objCtr}@${op.objActor}`
|
|
const elemId = op.keyStr ? undefined : (op.keyCtr === 0 ? '_head' : `${op.keyCtr}@${op.keyActor}`)
|
|
const action = ACTIONS[op.action] || op.action
|
|
const newOp = elemId ? {obj, elemId, action} : {obj, key: op.keyStr, action}
|
|
newOp.insert = !!op.insert
|
|
if (ACTIONS[op.action] === 'set' || ACTIONS[op.action] === 'inc') {
|
|
newOp.value = op.valLen
|
|
if (op.valLen_datatype) newOp.datatype = op.valLen_datatype
|
|
}
|
|
if (!!op.chldCtr !== !!op.chldActor) {
|
|
throw new RangeError(`Mismatched child columns: ${op.chldCtr} and ${op.chldActor}`)
|
|
}
|
|
if (op.chldCtr !== null) newOp.child = `${op.chldCtr}@${op.chldActor}`
|
|
if (forDocument) {
|
|
newOp.id = `${op.idCtr}@${op.idActor}`
|
|
newOp.succ = op.succNum.map(succ => `${succ.succCtr}@${succ.succActor}`)
|
|
checkSortedOpIds(op.succNum.map(succ => ({counter: succ.succCtr, actorId: succ.succActor})))
|
|
} else {
|
|
newOp.pred = op.predNum.map(pred => `${pred.predCtr}@${pred.predActor}`)
|
|
checkSortedOpIds(op.predNum.map(pred => ({counter: pred.predCtr, actorId: pred.predActor})))
|
|
}
|
|
newOps.push(newOp)
|
|
}
|
|
return newOps
|
|
}
|
|
|
|
/**
|
|
* Throws an exception if the opIds in the given array are not in sorted order.
|
|
*/
|
|
function checkSortedOpIds(opIds) {
|
|
let last = null
|
|
for (let opId of opIds) {
|
|
if (last && compareParsedOpIds(last, opId) !== -1) {
|
|
throw new RangeError('operation IDs are not in ascending order')
|
|
}
|
|
last = opId
|
|
}
|
|
}
|
|
|
|
function encoderByColumnId(columnId) {
|
|
if ((columnId & 7) === COLUMN_TYPE.INT_DELTA) {
|
|
return new DeltaEncoder()
|
|
} else if ((columnId & 7) === COLUMN_TYPE.BOOLEAN) {
|
|
return new BooleanEncoder()
|
|
} else if ((columnId & 7) === COLUMN_TYPE.STRING_RLE) {
|
|
return new RLEEncoder('utf8')
|
|
} else if ((columnId & 7) === COLUMN_TYPE.VALUE_RAW) {
|
|
return new Encoder()
|
|
} else {
|
|
return new RLEEncoder('uint')
|
|
}
|
|
}
|
|
|
|
function decoderByColumnId(columnId, buffer) {
|
|
if ((columnId & 7) === COLUMN_TYPE.INT_DELTA) {
|
|
return new DeltaDecoder(buffer)
|
|
} else if ((columnId & 7) === COLUMN_TYPE.BOOLEAN) {
|
|
return new BooleanDecoder(buffer)
|
|
} else if ((columnId & 7) === COLUMN_TYPE.STRING_RLE) {
|
|
return new RLEDecoder('utf8', buffer)
|
|
} else if ((columnId & 7) === COLUMN_TYPE.VALUE_RAW) {
|
|
return new Decoder(buffer)
|
|
} else {
|
|
return new RLEDecoder('uint', buffer)
|
|
}
|
|
}
|
|
|
|
function makeDecoders(columns, columnSpec) {
|
|
const emptyBuf = new Uint8Array(0)
|
|
let decoders = [], columnIndex = 0, specIndex = 0
|
|
|
|
while (columnIndex < columns.length || specIndex < columnSpec.length) {
|
|
if (columnIndex === columns.length ||
|
|
(specIndex < columnSpec.length && columnSpec[specIndex].columnId < columns[columnIndex].columnId)) {
|
|
const {columnId, columnName} = columnSpec[specIndex]
|
|
decoders.push({columnId, columnName, decoder: decoderByColumnId(columnId, emptyBuf)})
|
|
specIndex++
|
|
} else if (specIndex === columnSpec.length || columns[columnIndex].columnId < columnSpec[specIndex].columnId) {
|
|
const {columnId, buffer} = columns[columnIndex]
|
|
decoders.push({columnId, decoder: decoderByColumnId(columnId, buffer)})
|
|
columnIndex++
|
|
} else { // columns[columnIndex].columnId === columnSpec[specIndex].columnId
|
|
const {columnId, buffer} = columns[columnIndex], {columnName} = columnSpec[specIndex]
|
|
decoders.push({columnId, columnName, decoder: decoderByColumnId(columnId, buffer)})
|
|
columnIndex++
|
|
specIndex++
|
|
}
|
|
}
|
|
return decoders
|
|
}
|
|
|
|
function decodeColumns(columns, actorIds, columnSpec) {
|
|
columns = makeDecoders(columns, columnSpec)
|
|
let parsedRows = []
|
|
while (columns.some(col => !col.decoder.done)) {
|
|
let row = {}, col = 0
|
|
while (col < columns.length) {
|
|
const columnId = columns[col].columnId
|
|
let groupId = columnId >> 4, groupCols = 1
|
|
while (col + groupCols < columns.length && columns[col + groupCols].columnId >> 4 === groupId) {
|
|
groupCols++
|
|
}
|
|
|
|
if (columnId % 8 === COLUMN_TYPE.GROUP_CARD) {
|
|
const values = [], count = columns[col].decoder.readValue()
|
|
for (let i = 0; i < count; i++) {
|
|
let value = {}
|
|
for (let colOffset = 1; colOffset < groupCols; colOffset++) {
|
|
decodeValueColumns(columns, col + colOffset, actorIds, value)
|
|
}
|
|
values.push(value)
|
|
}
|
|
row[columns[col].columnName] = values
|
|
col += groupCols
|
|
} else {
|
|
col += decodeValueColumns(columns, col, actorIds, row)
|
|
}
|
|
}
|
|
parsedRows.push(row)
|
|
}
|
|
return parsedRows
|
|
}
|
|
|
|
function decodeColumnInfo(decoder) {
|
|
// A number that is all 1 bits except for the bit that indicates whether a column is
|
|
// deflate-compressed. We ignore this bit when checking whether columns are sorted by ID.
|
|
const COLUMN_ID_MASK = (-1 ^ COLUMN_TYPE_DEFLATE) >>> 0
|
|
|
|
let lastColumnId = -1, columns = [], numColumns = decoder.readUint53()
|
|
for (let i = 0; i < numColumns; i++) {
|
|
const columnId = decoder.readUint53(), bufferLen = decoder.readUint53()
|
|
if ((columnId & COLUMN_ID_MASK) <= (lastColumnId & COLUMN_ID_MASK)) {
|
|
throw new RangeError('Columns must be in ascending order')
|
|
}
|
|
lastColumnId = columnId
|
|
columns.push({columnId, bufferLen})
|
|
}
|
|
return columns
|
|
}
|
|
|
|
function encodeColumnInfo(encoder, columns) {
|
|
const nonEmptyColumns = columns.filter(column => column.encoder.buffer.byteLength > 0)
|
|
encoder.appendUint53(nonEmptyColumns.length)
|
|
for (let column of nonEmptyColumns) {
|
|
encoder.appendUint53(column.id)
|
|
encoder.appendUint53(column.encoder.buffer.byteLength)
|
|
}
|
|
}
|
|
|
|
function decodeChangeHeader(decoder) {
|
|
const numDeps = decoder.readUint53(), deps = []
|
|
for (let i = 0; i < numDeps; i++) {
|
|
deps.push(bytesToHexString(decoder.readRawBytes(32)))
|
|
}
|
|
let change = {
|
|
actor: decoder.readHexString(),
|
|
seq: decoder.readUint53(),
|
|
startOp: decoder.readUint53(),
|
|
time: decoder.readInt53(),
|
|
message: decoder.readPrefixedString(),
|
|
deps
|
|
}
|
|
const actorIds = [change.actor], numActorIds = decoder.readUint53()
|
|
for (let i = 0; i < numActorIds; i++) actorIds.push(decoder.readHexString())
|
|
change.actorIds = actorIds
|
|
return change
|
|
}
|
|
|
|
/**
|
|
* Assembles a chunk of encoded data containing a checksum, headers, and a
|
|
* series of encoded columns. Calls `encodeHeaderCallback` with an encoder that
|
|
* should be used to add the headers. The columns should be given as `columns`.
|
|
*/
|
|
function encodeContainer(chunkType, encodeContentsCallback) {
|
|
const CHECKSUM_SIZE = 4 // checksum is first 4 bytes of SHA-256 hash of the rest of the data
|
|
const HEADER_SPACE = MAGIC_BYTES.byteLength + CHECKSUM_SIZE + 1 + 5 // 1 byte type + 5 bytes length
|
|
const body = new Encoder()
|
|
// Make space for the header at the beginning of the body buffer. We will
|
|
// copy the header in here later. This is cheaper than copying the body since
|
|
// the body is likely to be much larger than the header.
|
|
body.appendRawBytes(new Uint8Array(HEADER_SPACE))
|
|
encodeContentsCallback(body)
|
|
|
|
const bodyBuf = body.buffer
|
|
const header = new Encoder()
|
|
header.appendByte(chunkType)
|
|
header.appendUint53(bodyBuf.byteLength - HEADER_SPACE)
|
|
|
|
// Compute the hash over chunkType, length, and body
|
|
const headerBuf = header.buffer
|
|
const sha256 = new Hash()
|
|
sha256.update(headerBuf)
|
|
sha256.update(bodyBuf.subarray(HEADER_SPACE))
|
|
const hash = sha256.digest(), checksum = hash.subarray(0, CHECKSUM_SIZE)
|
|
|
|
// Copy header into the body buffer so that they are contiguous
|
|
bodyBuf.set(MAGIC_BYTES, HEADER_SPACE - headerBuf.byteLength - CHECKSUM_SIZE - MAGIC_BYTES.byteLength)
|
|
bodyBuf.set(checksum, HEADER_SPACE - headerBuf.byteLength - CHECKSUM_SIZE)
|
|
bodyBuf.set(headerBuf, HEADER_SPACE - headerBuf.byteLength)
|
|
return {hash, bytes: bodyBuf.subarray(HEADER_SPACE - headerBuf.byteLength - CHECKSUM_SIZE - MAGIC_BYTES.byteLength)}
|
|
}
|
|
|
|
function decodeContainerHeader(decoder, computeHash) {
|
|
if (!equalBytes(decoder.readRawBytes(MAGIC_BYTES.byteLength), MAGIC_BYTES)) {
|
|
throw new RangeError('Data does not begin with magic bytes 85 6f 4a 83')
|
|
}
|
|
const expectedHash = decoder.readRawBytes(4)
|
|
const hashStartOffset = decoder.offset
|
|
const chunkType = decoder.readByte()
|
|
const chunkLength = decoder.readUint53()
|
|
const header = {chunkType, chunkLength, chunkData: decoder.readRawBytes(chunkLength)}
|
|
|
|
if (computeHash) {
|
|
const sha256 = new Hash()
|
|
sha256.update(decoder.buf.subarray(hashStartOffset, decoder.offset))
|
|
const binaryHash = sha256.digest()
|
|
if (!equalBytes(binaryHash.subarray(0, 4), expectedHash)) {
|
|
throw new RangeError('checksum does not match data')
|
|
}
|
|
header.hash = bytesToHexString(binaryHash)
|
|
}
|
|
return header
|
|
}
|
|
|
|
/**
|
|
* Returns the checksum of a change (bytes 4 to 7) as a 32-bit unsigned integer.
|
|
*/
|
|
function getChangeChecksum(change) {
|
|
if (change[0] !== MAGIC_BYTES[0] || change[1] !== MAGIC_BYTES[1] ||
|
|
change[2] !== MAGIC_BYTES[2] || change[3] !== MAGIC_BYTES[3]) {
|
|
throw new RangeError('Data does not begin with magic bytes 85 6f 4a 83')
|
|
}
|
|
return ((change[4] << 24) | (change[5] << 16) | (change[6] << 8) | change[7]) >>> 0
|
|
}
|
|
|
|
function encodeChange(changeObj) {
|
|
const { changes, actorIds } = parseAllOpIds([changeObj], true)
|
|
const change = changes[0]
|
|
|
|
const { hash, bytes } = encodeContainer(CHUNK_TYPE_CHANGE, encoder => {
|
|
if (!Array.isArray(change.deps)) throw new TypeError('deps is not an array')
|
|
encoder.appendUint53(change.deps.length)
|
|
for (let hash of change.deps.slice().sort()) {
|
|
encoder.appendRawBytes(hexStringToBytes(hash))
|
|
}
|
|
encoder.appendHexString(change.actor)
|
|
encoder.appendUint53(change.seq)
|
|
encoder.appendUint53(change.startOp)
|
|
encoder.appendInt53(change.time)
|
|
encoder.appendPrefixedString(change.message || '')
|
|
encoder.appendUint53(actorIds.length - 1)
|
|
for (let actor of actorIds.slice(1)) encoder.appendHexString(actor)
|
|
|
|
const columns = encodeOps(change.ops, false)
|
|
encodeColumnInfo(encoder, columns)
|
|
for (let column of columns) encoder.appendRawBytes(column.encoder.buffer)
|
|
if (change.extraBytes) encoder.appendRawBytes(change.extraBytes)
|
|
})
|
|
|
|
const hexHash = bytesToHexString(hash)
|
|
if (changeObj.hash && changeObj.hash !== hexHash) {
|
|
throw new RangeError(`Change hash does not match encoding: ${changeObj.hash} != ${hexHash}`)
|
|
}
|
|
return (bytes.byteLength >= DEFLATE_MIN_SIZE) ? deflateChange(bytes) : bytes
|
|
}
|
|
|
|
function decodeChangeColumns(buffer) {
|
|
if (buffer[8] === CHUNK_TYPE_DEFLATE) buffer = inflateChange(buffer)
|
|
const decoder = new Decoder(buffer)
|
|
const header = decodeContainerHeader(decoder, true)
|
|
const chunkDecoder = new Decoder(header.chunkData)
|
|
if (!decoder.done) throw new RangeError('Encoded change has trailing data')
|
|
if (header.chunkType !== CHUNK_TYPE_CHANGE) throw new RangeError(`Unexpected chunk type: ${header.chunkType}`)
|
|
|
|
const change = decodeChangeHeader(chunkDecoder)
|
|
const columns = decodeColumnInfo(chunkDecoder)
|
|
for (let i = 0; i < columns.length; i++) {
|
|
if ((columns[i].columnId & COLUMN_TYPE_DEFLATE) !== 0) {
|
|
throw new RangeError('change must not contain deflated columns')
|
|
}
|
|
columns[i].buffer = chunkDecoder.readRawBytes(columns[i].bufferLen)
|
|
}
|
|
if (!chunkDecoder.done) {
|
|
const restLen = chunkDecoder.buf.byteLength - chunkDecoder.offset
|
|
change.extraBytes = chunkDecoder.readRawBytes(restLen)
|
|
}
|
|
|
|
change.columns = columns
|
|
change.hash = header.hash
|
|
return change
|
|
}
|
|
|
|
/**
|
|
* Decodes one change in binary format into its JS object representation.
|
|
*/
|
|
function decodeChange(buffer) {
|
|
const change = decodeChangeColumns(buffer)
|
|
change.ops = decodeOps(decodeColumns(change.columns, change.actorIds, CHANGE_COLUMNS), false)
|
|
delete change.actorIds
|
|
delete change.columns
|
|
return change
|
|
}
|
|
|
|
/**
|
|
* Decodes the header fields of a change in binary format, but does not decode
|
|
* the operations. Saves work when we only need to inspect the headers. Only
|
|
* computes the hash of the change if `computeHash` is true.
|
|
*/
|
|
function decodeChangeMeta(buffer, computeHash) {
|
|
if (buffer[8] === CHUNK_TYPE_DEFLATE) buffer = inflateChange(buffer)
|
|
const header = decodeContainerHeader(new Decoder(buffer), computeHash)
|
|
if (header.chunkType !== CHUNK_TYPE_CHANGE) {
|
|
throw new RangeError('Buffer chunk type is not a change')
|
|
}
|
|
const meta = decodeChangeHeader(new Decoder(header.chunkData))
|
|
meta.change = buffer
|
|
if (computeHash) meta.hash = header.hash
|
|
return meta
|
|
}
|
|
|
|
/**
|
|
* Compresses a binary change using DEFLATE.
|
|
*/
|
|
function deflateChange(buffer) {
|
|
const header = decodeContainerHeader(new Decoder(buffer), false)
|
|
if (header.chunkType !== CHUNK_TYPE_CHANGE) throw new RangeError(`Unexpected chunk type: ${header.chunkType}`)
|
|
const compressed = pako.deflateRaw(header.chunkData)
|
|
const encoder = new Encoder()
|
|
encoder.appendRawBytes(buffer.subarray(0, 8)) // copy MAGIC_BYTES and checksum
|
|
encoder.appendByte(CHUNK_TYPE_DEFLATE)
|
|
encoder.appendUint53(compressed.byteLength)
|
|
encoder.appendRawBytes(compressed)
|
|
return encoder.buffer
|
|
}
|
|
|
|
/**
|
|
* Decompresses a binary change that has been compressed with DEFLATE.
|
|
*/
|
|
function inflateChange(buffer) {
|
|
const header = decodeContainerHeader(new Decoder(buffer), false)
|
|
if (header.chunkType !== CHUNK_TYPE_DEFLATE) throw new RangeError(`Unexpected chunk type: ${header.chunkType}`)
|
|
const decompressed = pako.inflateRaw(header.chunkData)
|
|
const encoder = new Encoder()
|
|
encoder.appendRawBytes(buffer.subarray(0, 8)) // copy MAGIC_BYTES and checksum
|
|
encoder.appendByte(CHUNK_TYPE_CHANGE)
|
|
encoder.appendUint53(decompressed.byteLength)
|
|
encoder.appendRawBytes(decompressed)
|
|
return encoder.buffer
|
|
}
|
|
|
|
/**
|
|
* Takes an Uint8Array that may contain multiple concatenated changes, and
|
|
* returns an array of subarrays, each subarray containing one change.
|
|
*/
|
|
function splitContainers(buffer) {
|
|
let decoder = new Decoder(buffer), chunks = [], startOffset = 0
|
|
while (!decoder.done) {
|
|
decodeContainerHeader(decoder, false)
|
|
chunks.push(buffer.subarray(startOffset, decoder.offset))
|
|
startOffset = decoder.offset
|
|
}
|
|
return chunks
|
|
}
|
|
|
|
/**
|
|
* Decodes a list of changes from the binary format into JS objects.
|
|
* `binaryChanges` is an array of `Uint8Array` objects.
|
|
*/
|
|
function decodeChanges(binaryChanges) {
|
|
let decoded = []
|
|
for (let binaryChange of binaryChanges) {
|
|
for (let chunk of splitContainers(binaryChange)) {
|
|
if (chunk[8] === CHUNK_TYPE_DOCUMENT) {
|
|
decoded = decoded.concat(decodeDocument(chunk))
|
|
} else if (chunk[8] === CHUNK_TYPE_CHANGE || chunk[8] === CHUNK_TYPE_DEFLATE) {
|
|
decoded.push(decodeChange(chunk))
|
|
} else {
|
|
// ignoring chunk of unknown type
|
|
}
|
|
}
|
|
}
|
|
return decoded
|
|
}
|
|
|
|
function sortOpIds(a, b) {
|
|
if (a === b) return 0
|
|
if (a === '_root') return -1
|
|
if (b === '_root') return +1
|
|
const a_ = parseOpId(a), b_ = parseOpId(b)
|
|
if (a_.counter < b_.counter) return -1
|
|
if (a_.counter > b_.counter) return +1
|
|
if (a_.actorId < b_.actorId) return -1
|
|
if (a_.actorId > b_.actorId) return +1
|
|
return 0
|
|
}
|
|
|
|
function groupDocumentOps(changes) {
|
|
let byObjectId = {}, byReference = {}, objectType = {}
|
|
for (let change of changes) {
|
|
for (let i = 0; i < change.ops.length; i++) {
|
|
const op = change.ops[i], opId = `${op.id.counter}@${op.id.actorId}`
|
|
const objectId = (op.obj === '_root') ? '_root' : `${op.obj.counter}@${op.obj.actorId}`
|
|
if (op.action.startsWith('make')) {
|
|
objectType[opId] = op.action
|
|
if (op.action === 'makeList' || op.action === 'makeText') {
|
|
byReference[opId] = {'_head': []}
|
|
}
|
|
}
|
|
|
|
let key
|
|
if (objectId === '_root' || objectType[objectId] === 'makeMap' || objectType[objectId] === 'makeTable') {
|
|
key = op.key
|
|
} else if (objectType[objectId] === 'makeList' || objectType[objectId] === 'makeText') {
|
|
if (op.insert) {
|
|
key = opId
|
|
const ref = (op.elemId === '_head') ? '_head' : `${op.elemId.counter}@${op.elemId.actorId}`
|
|
byReference[objectId][ref].push(opId)
|
|
byReference[objectId][opId] = []
|
|
} else {
|
|
key = `${op.elemId.counter}@${op.elemId.actorId}`
|
|
}
|
|
} else {
|
|
throw new RangeError(`Unknown object type for object ${objectId}`)
|
|
}
|
|
|
|
if (!byObjectId[objectId]) byObjectId[objectId] = {}
|
|
if (!byObjectId[objectId][key]) byObjectId[objectId][key] = {}
|
|
byObjectId[objectId][key][opId] = op
|
|
op.succ = []
|
|
|
|
for (let pred of op.pred) {
|
|
const predId = `${pred.counter}@${pred.actorId}`
|
|
if (!byObjectId[objectId][key][predId]) {
|
|
throw new RangeError(`No predecessor operation ${predId}`)
|
|
}
|
|
byObjectId[objectId][key][predId].succ.push(op.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
let ops = []
|
|
for (let objectId of Object.keys(byObjectId).sort(sortOpIds)) {
|
|
let keys = []
|
|
if (objectType[objectId] === 'makeList' || objectType[objectId] === 'makeText') {
|
|
let stack = ['_head']
|
|
while (stack.length > 0) {
|
|
const key = stack.pop()
|
|
if (key !== '_head') keys.push(key)
|
|
for (let opId of byReference[objectId][key].sort(sortOpIds)) stack.push(opId)
|
|
}
|
|
} else {
|
|
// FIXME JavaScript sorts based on UTF-16 encoding. We should change this to use the UTF-8
|
|
// encoding instead (the sort order will be different beyond the basic multilingual plane)
|
|
keys = Object.keys(byObjectId[objectId]).sort()
|
|
}
|
|
|
|
for (let key of keys) {
|
|
for (let opId of Object.keys(byObjectId[objectId][key]).sort(sortOpIds)) {
|
|
const op = byObjectId[objectId][key][opId]
|
|
if (op.action !== 'del') ops.push(op)
|
|
}
|
|
}
|
|
}
|
|
return ops
|
|
}
|
|
|
|
/**
|
|
* Takes a set of operations `ops` loaded from an encoded document, and
|
|
* reconstructs the changes that they originally came from.
|
|
* Does not return anything, only mutates `changes`.
|
|
*/
|
|
function groupChangeOps(changes, ops) {
|
|
let changesByActor = {} // map from actorId to array of changes by that actor
|
|
for (let change of changes) {
|
|
change.ops = []
|
|
if (!changesByActor[change.actor]) changesByActor[change.actor] = []
|
|
if (change.seq !== changesByActor[change.actor].length + 1) {
|
|
throw new RangeError(`Expected seq = ${changesByActor[change.actor].length + 1}, got ${change.seq}`)
|
|
}
|
|
if (change.seq > 1 && changesByActor[change.actor][change.seq - 2].maxOp > change.maxOp) {
|
|
throw new RangeError('maxOp must increase monotonically per actor')
|
|
}
|
|
changesByActor[change.actor].push(change)
|
|
}
|
|
|
|
let opsById = {}
|
|
for (let op of ops) {
|
|
if (op.action === 'del') throw new RangeError('document should not contain del operations')
|
|
op.pred = opsById[op.id] ? opsById[op.id].pred : []
|
|
opsById[op.id] = op
|
|
for (let succ of op.succ) {
|
|
if (!opsById[succ]) {
|
|
if (op.elemId) {
|
|
const elemId = op.insert ? op.id : op.elemId
|
|
opsById[succ] = {id: succ, action: 'del', obj: op.obj, elemId, pred: []}
|
|
} else {
|
|
opsById[succ] = {id: succ, action: 'del', obj: op.obj, key: op.key, pred: []}
|
|
}
|
|
}
|
|
opsById[succ].pred.push(op.id)
|
|
}
|
|
delete op.succ
|
|
}
|
|
for (let op of Object.values(opsById)) {
|
|
if (op.action === 'del') ops.push(op)
|
|
}
|
|
|
|
for (let op of ops) {
|
|
const { counter, actorId } = parseOpId(op.id)
|
|
const actorChanges = changesByActor[actorId]
|
|
// Binary search to find the change that should contain this operation
|
|
let left = 0, right = actorChanges.length
|
|
while (left < right) {
|
|
const index = Math.floor((left + right) / 2)
|
|
if (actorChanges[index].maxOp < counter) {
|
|
left = index + 1
|
|
} else {
|
|
right = index
|
|
}
|
|
}
|
|
if (left >= actorChanges.length) {
|
|
throw new RangeError(`Operation ID ${op.id} outside of allowed range`)
|
|
}
|
|
actorChanges[left].ops.push(op)
|
|
}
|
|
|
|
for (let change of changes) {
|
|
change.ops.sort((op1, op2) => sortOpIds(op1.id, op2.id))
|
|
change.startOp = change.maxOp - change.ops.length + 1
|
|
delete change.maxOp
|
|
for (let i = 0; i < change.ops.length; i++) {
|
|
const op = change.ops[i], expectedId = `${change.startOp + i}@${change.actor}`
|
|
if (op.id !== expectedId) {
|
|
throw new RangeError(`Expected opId ${expectedId}, got ${op.id}`)
|
|
}
|
|
delete op.id
|
|
}
|
|
}
|
|
}
|
|
|
|
function encodeDocumentChanges(changes) {
|
|
const columns = { // see DOCUMENT_COLUMNS
|
|
actor : new RLEEncoder('uint'),
|
|
seq : new DeltaEncoder(),
|
|
maxOp : new DeltaEncoder(),
|
|
time : new DeltaEncoder(),
|
|
message : new RLEEncoder('utf8'),
|
|
depsNum : new RLEEncoder('uint'),
|
|
depsIndex : new DeltaEncoder(),
|
|
extraLen : new RLEEncoder('uint'),
|
|
extraRaw : new Encoder()
|
|
}
|
|
let indexByHash = {} // map from change hash to its index in the changes array
|
|
let heads = {} // change hashes that are not a dependency of any other change
|
|
|
|
for (let i = 0; i < changes.length; i++) {
|
|
const change = changes[i]
|
|
indexByHash[change.hash] = i
|
|
heads[change.hash] = true
|
|
|
|
columns.actor.appendValue(change.actorNum)
|
|
columns.seq.appendValue(change.seq)
|
|
columns.maxOp.appendValue(change.startOp + change.ops.length - 1)
|
|
columns.time.appendValue(change.time)
|
|
columns.message.appendValue(change.message)
|
|
columns.depsNum.appendValue(change.deps.length)
|
|
|
|
for (let dep of change.deps) {
|
|
if (typeof indexByHash[dep] !== 'number') {
|
|
throw new RangeError(`Unknown dependency hash: ${dep}`)
|
|
}
|
|
columns.depsIndex.appendValue(indexByHash[dep])
|
|
if (heads[dep]) delete heads[dep]
|
|
}
|
|
|
|
if (change.extraBytes) {
|
|
columns.extraLen.appendValue(change.extraBytes.byteLength << 4 | VALUE_TYPE.BYTES)
|
|
columns.extraRaw.appendRawBytes(change.extraBytes)
|
|
} else {
|
|
columns.extraLen.appendValue(VALUE_TYPE.BYTES) // zero-length byte array
|
|
}
|
|
}
|
|
|
|
let changesColumns = []
|
|
for (let {columnName, columnId} of DOCUMENT_COLUMNS) {
|
|
changesColumns.push({id: columnId, name: columnName, encoder: columns[columnName]})
|
|
}
|
|
changesColumns.sort((a, b) => a.id - b.id)
|
|
return { changesColumns, heads: Object.keys(heads).sort() }
|
|
}
|
|
|
|
function decodeDocumentChanges(changes, expectedHeads) {
|
|
let heads = {} // change hashes that are not a dependency of any other change
|
|
for (let i = 0; i < changes.length; i++) {
|
|
let change = changes[i]
|
|
change.deps = []
|
|
for (let index of change.depsNum.map(d => d.depsIndex)) {
|
|
if (!changes[index] || !changes[index].hash) {
|
|
throw new RangeError(`No hash for index ${index} while processing index ${i}`)
|
|
}
|
|
const hash = changes[index].hash
|
|
change.deps.push(hash)
|
|
if (heads[hash]) delete heads[hash]
|
|
}
|
|
change.deps.sort()
|
|
delete change.depsNum
|
|
|
|
if (change.extraLen_datatype !== VALUE_TYPE.BYTES) {
|
|
throw new RangeError(`Bad datatype for extra bytes: ${VALUE_TYPE.BYTES}`)
|
|
}
|
|
change.extraBytes = change.extraLen
|
|
delete change.extraLen_datatype
|
|
|
|
// Encoding and decoding again to compute the hash of the change
|
|
changes[i] = decodeChange(encodeChange(change))
|
|
heads[changes[i].hash] = true
|
|
}
|
|
|
|
const actualHeads = Object.keys(heads).sort()
|
|
let headsEqual = (actualHeads.length === expectedHeads.length), i = 0
|
|
while (headsEqual && i < actualHeads.length) {
|
|
headsEqual = (actualHeads[i] === expectedHeads[i])
|
|
i++
|
|
}
|
|
if (!headsEqual) {
|
|
throw new RangeError(`Mismatched heads hashes: expected ${expectedHeads.join(', ')}, got ${actualHeads.join(', ')}`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Transforms a list of changes into a binary representation of the document state.
|
|
*/
|
|
function encodeDocument(binaryChanges) {
|
|
const { changes, actorIds } = parseAllOpIds(decodeChanges(binaryChanges), false)
|
|
const { changesColumns, heads } = encodeDocumentChanges(changes)
|
|
const opsColumns = encodeOps(groupDocumentOps(changes), true)
|
|
for (let column of changesColumns) deflateColumn(column)
|
|
for (let column of opsColumns) deflateColumn(column)
|
|
|
|
return encodeContainer(CHUNK_TYPE_DOCUMENT, encoder => {
|
|
encoder.appendUint53(actorIds.length)
|
|
for (let actor of actorIds) {
|
|
encoder.appendHexString(actor)
|
|
}
|
|
encoder.appendUint53(heads.length)
|
|
for (let head of heads.sort()) {
|
|
encoder.appendRawBytes(hexStringToBytes(head))
|
|
}
|
|
encodeColumnInfo(encoder, changesColumns)
|
|
encodeColumnInfo(encoder, opsColumns)
|
|
for (let column of changesColumns) encoder.appendRawBytes(column.encoder.buffer)
|
|
for (let column of opsColumns) encoder.appendRawBytes(column.encoder.buffer)
|
|
}).bytes
|
|
}
|
|
|
|
function decodeDocumentHeader(buffer) {
|
|
const documentDecoder = new Decoder(buffer)
|
|
const header = decodeContainerHeader(documentDecoder, true)
|
|
const decoder = new Decoder(header.chunkData)
|
|
if (!documentDecoder.done) throw new RangeError('Encoded document has trailing data')
|
|
if (header.chunkType !== CHUNK_TYPE_DOCUMENT) throw new RangeError(`Unexpected chunk type: ${header.chunkType}`)
|
|
|
|
const actorIds = [], numActors = decoder.readUint53()
|
|
for (let i = 0; i < numActors; i++) {
|
|
actorIds.push(decoder.readHexString())
|
|
}
|
|
const heads = [], numHeads = decoder.readUint53()
|
|
for (let i = 0; i < numHeads; i++) {
|
|
heads.push(bytesToHexString(decoder.readRawBytes(32)))
|
|
}
|
|
|
|
const changesColumns = decodeColumnInfo(decoder)
|
|
const opsColumns = decodeColumnInfo(decoder)
|
|
for (let i = 0; i < changesColumns.length; i++) {
|
|
changesColumns[i].buffer = decoder.readRawBytes(changesColumns[i].bufferLen)
|
|
inflateColumn(changesColumns[i])
|
|
}
|
|
for (let i = 0; i < opsColumns.length; i++) {
|
|
opsColumns[i].buffer = decoder.readRawBytes(opsColumns[i].bufferLen)
|
|
inflateColumn(opsColumns[i])
|
|
}
|
|
|
|
const extraBytes = decoder.readRawBytes(decoder.buf.byteLength - decoder.offset)
|
|
return { changesColumns, opsColumns, actorIds, heads, extraBytes }
|
|
}
|
|
|
|
function decodeDocument(buffer) {
|
|
const { changesColumns, opsColumns, actorIds, heads } = decodeDocumentHeader(buffer)
|
|
const changes = decodeColumns(changesColumns, actorIds, DOCUMENT_COLUMNS)
|
|
const ops = decodeOps(decodeColumns(opsColumns, actorIds, DOC_OPS_COLUMNS), true)
|
|
groupChangeOps(changes, ops)
|
|
decodeDocumentChanges(changes, heads)
|
|
return changes
|
|
}
|
|
|
|
/**
|
|
* DEFLATE-compresses the given column if it is large enough to make the compression worthwhile.
|
|
*/
|
|
function deflateColumn(column) {
|
|
if (column.encoder.buffer.byteLength >= DEFLATE_MIN_SIZE) {
|
|
column.encoder = {buffer: pako.deflateRaw(column.encoder.buffer)}
|
|
column.id |= COLUMN_TYPE_DEFLATE
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decompresses the given column if it is DEFLATE-compressed.
|
|
*/
|
|
function inflateColumn(column) {
|
|
if ((column.columnId & COLUMN_TYPE_DEFLATE) !== 0) {
|
|
column.buffer = pako.inflateRaw(column.buffer)
|
|
column.columnId ^= COLUMN_TYPE_DEFLATE
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Takes all the operations for the same property (i.e. the same key in a map, or the same list
|
|
* element) and mutates the object patch to reflect the current value(s) of that property. There
|
|
* might be multiple values in the case of a conflict. `objects` is a map from objectId to the
|
|
* patch for that object. `property` contains `objId`, `key`, a list of `ops`, and `index` (the
|
|
* current list index if the object is a list). Returns true if one or more values are present,
|
|
* or false if the property has been deleted.
|
|
*/
|
|
function addPatchProperty(objects, property) {
|
|
let values = {}, counter = null
|
|
for (let op of property.ops) {
|
|
// Apply counters and their increments regardless of the number of successor operations
|
|
if (op.actionName === 'set' && op.value.datatype === 'counter') {
|
|
if (!counter) counter = {opId: op.opId, value: 0, succ: {}}
|
|
counter.value += op.value.value
|
|
for (let succId of op.succ) counter.succ[succId] = true
|
|
} else if (op.actionName === 'inc') {
|
|
if (!counter) throw new RangeError(`inc operation ${op.opId} without a counter`)
|
|
counter.value += op.value.value
|
|
delete counter.succ[op.opId]
|
|
for (let succId of op.succ) counter.succ[succId] = true
|
|
|
|
} else if (op.succ.length === 0) { // Ignore any ops that have been overwritten
|
|
if (op.actionName.startsWith('make')) {
|
|
values[op.opId] = objects[op.opId]
|
|
} else if (op.actionName === 'set') {
|
|
values[op.opId] = {value: op.value.value, type: 'value'}
|
|
if (op.value.datatype) {
|
|
values[op.opId].datatype = op.value.datatype
|
|
}
|
|
} else if (op.actionName === 'link') {
|
|
// NB. This assumes that the ID of the child object is greater than the ID of the current
|
|
// object. This is true as long as link operations are only used to redo undone make*
|
|
// operations, but it will cease to be true once subtree moves are allowed.
|
|
if (!op.childId) throw new RangeError(`link operation ${op.opId} without a childId`)
|
|
values[op.opId] = objects[op.childId]
|
|
} else {
|
|
throw new RangeError(`Unexpected action type: ${op.actionName}`)
|
|
}
|
|
}
|
|
}
|
|
|
|
// If the counter had any successor operation that was not an increment, that means the counter
|
|
// must have been deleted, so we omit it from the patch.
|
|
if (counter && Object.keys(counter.succ).length === 0) {
|
|
values[counter.opId] = {type: 'value', value: counter.value, datatype: 'counter'}
|
|
}
|
|
|
|
if (Object.keys(values).length > 0) {
|
|
let obj = objects[property.objId]
|
|
if (obj.type === 'map' || obj.type === 'table') {
|
|
obj.props[property.key] = values
|
|
} else if (obj.type === 'list' || obj.type === 'text') {
|
|
makeListEdits(obj, values, property.key, property.index)
|
|
}
|
|
return true
|
|
} else {
|
|
return false
|
|
}
|
|
}
|
|
|
|
/**
|
|
* When constructing a patch to instantiate a loaded document, this function adds the edits to
|
|
* insert one list element. Usually there is one value, but in the case of a conflict there may be
|
|
* several values. `elemId` is the ID of the list element, and `index` is the list index at which
|
|
* the value(s) should be placed.
|
|
*/
|
|
function makeListEdits(list, values, elemId, index) {
|
|
let firstValue = true
|
|
const opIds = Object.keys(values).sort((id1, id2) => compareParsedOpIds(parseOpId(id1), parseOpId(id2)))
|
|
for (const opId of opIds) {
|
|
if (firstValue) {
|
|
list.edits.push({action: 'insert', value: values[opId], elemId, opId, index})
|
|
} else {
|
|
list.edits.push({action: 'update', value: values[opId], opId, index})
|
|
}
|
|
firstValue = false
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Recursively walks the patch tree, calling appendEdit on every list edit in order to consense
|
|
* consecutive sequences of insertions into multi-inserts.
|
|
*/
|
|
function condenseEdits(diff) {
|
|
if (diff.type === 'list' || diff.type === 'text') {
|
|
diff.edits.forEach(e => condenseEdits(e.value))
|
|
let newEdits = diff.edits
|
|
diff.edits = []
|
|
for (const edit of newEdits) appendEdit(diff.edits, edit)
|
|
} else if (diff.type === 'map' || diff.type === 'table') {
|
|
for (const prop of Object.keys(diff.props)) {
|
|
for (const opId of Object.keys(diff.props[prop])) {
|
|
condenseEdits(diff.props[prop][opId])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Appends a list edit operation (insert, update, remove) to an array of existing operations. If the
|
|
* last existing operation can be extended (as a multi-op), we do that.
|
|
*/
|
|
function appendEdit(existingEdits, nextEdit) {
|
|
if (existingEdits.length === 0) {
|
|
existingEdits.push(nextEdit)
|
|
return
|
|
}
|
|
|
|
let lastEdit = existingEdits[existingEdits.length - 1]
|
|
if (lastEdit.action === 'insert' && nextEdit.action === 'insert' &&
|
|
lastEdit.index === nextEdit.index - 1 &&
|
|
lastEdit.value.type === 'value' && nextEdit.value.type === 'value' &&
|
|
lastEdit.elemId === lastEdit.opId && nextEdit.elemId === nextEdit.opId &&
|
|
opIdDelta(lastEdit.elemId, nextEdit.elemId, 1)) {
|
|
lastEdit.action = 'multi-insert'
|
|
lastEdit.values = [lastEdit.value.value, nextEdit.value.value]
|
|
delete lastEdit.value
|
|
delete lastEdit.opId
|
|
|
|
} else if (lastEdit.action === 'multi-insert' && nextEdit.action === 'insert' &&
|
|
lastEdit.index + lastEdit.values.length === nextEdit.index &&
|
|
nextEdit.value.type === 'value' && nextEdit.elemId === nextEdit.opId &&
|
|
opIdDelta(lastEdit.elemId, nextEdit.elemId, lastEdit.values.length)) {
|
|
lastEdit.values.push(nextEdit.value.value)
|
|
|
|
} else if (lastEdit.action === 'remove' && nextEdit.action === 'remove' &&
|
|
lastEdit.index === nextEdit.index) {
|
|
lastEdit.count += nextEdit.count
|
|
|
|
} else {
|
|
existingEdits.push(nextEdit)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns true if the two given operation IDs have the same actor ID, and the counter of `id2` is
|
|
* exactly `delta` greater than the counter of `id1`.
|
|
*/
|
|
function opIdDelta(id1, id2, delta = 1) {
|
|
const parsed1 = parseOpId(id1), parsed2 = parseOpId(id2)
|
|
return parsed1.actorId === parsed2.actorId && parsed1.counter + delta === parsed2.counter
|
|
}
|
|
|
|
/**
|
|
* Parses the document (in compressed binary format) given as `documentBuffer`
|
|
* and returns a patch that can be sent to the frontend to instantiate the
|
|
* current state of that document.
|
|
*/
|
|
function constructPatch(documentBuffer) {
|
|
const { opsColumns, actorIds } = decodeDocumentHeader(documentBuffer)
|
|
const col = makeDecoders(opsColumns, DOC_OPS_COLUMNS).reduce(
|
|
(acc, col) => Object.assign(acc, {[col.columnName]: col.decoder}), {})
|
|
|
|
let objects = {_root: {objectId: '_root', type: 'map', props: {}}}
|
|
let property = null
|
|
|
|
while (!col.idActor.done) {
|
|
const opId = `${col.idCtr.readValue()}@${actorIds[col.idActor.readValue()]}`
|
|
const action = col.action.readValue(), actionName = ACTIONS[action]
|
|
if (action % 2 === 0) { // even-numbered actions are object creation
|
|
const type = OBJECT_TYPE[actionName] || 'unknown'
|
|
if (type === 'list' || type === 'text') {
|
|
objects[opId] = {objectId: opId, type, edits: []}
|
|
} else {
|
|
objects[opId] = {objectId: opId, type, props: {}}
|
|
}
|
|
}
|
|
|
|
const objActor = col.objActor.readValue(), objCtr = col.objCtr.readValue()
|
|
const objId = objActor === null ? '_root' : `${objCtr}@${actorIds[objActor]}`
|
|
let obj = objects[objId]
|
|
if (!obj) throw new RangeError(`Operation for nonexistent object: ${objId}`)
|
|
|
|
const keyActor = col.keyActor.readValue(), keyCtr = col.keyCtr.readValue()
|
|
const keyStr = col.keyStr.readValue(), insert = !!col.insert.readValue()
|
|
const chldActor = col.chldActor.readValue(), chldCtr = col.chldCtr.readValue()
|
|
const childId = chldActor === null ? null : `${chldCtr}@${actorIds[chldActor]}`
|
|
const sizeTag = col.valLen.readValue()
|
|
const rawValue = col.valRaw.readRawBytes(sizeTag >> 4)
|
|
const value = decodeValue(sizeTag, rawValue)
|
|
const succNum = col.succNum.readValue()
|
|
let succ = []
|
|
for (let i = 0; i < succNum; i++) {
|
|
succ.push(`${col.succCtr.readValue()}@${actorIds[col.succActor.readValue()]}`)
|
|
}
|
|
|
|
if (!actionName || obj.type === 'unknown') continue
|
|
|
|
let key
|
|
if (obj.type === 'list' || obj.type === 'text') {
|
|
if (keyCtr === null || (keyCtr === 0 && !insert)) {
|
|
throw new RangeError(`Operation ${opId} on ${obj.type} object has no key`)
|
|
}
|
|
key = insert ? opId : `${keyCtr}@${actorIds[keyActor]}`
|
|
} else {
|
|
if (keyStr === null) {
|
|
throw new RangeError(`Operation ${opId} on ${obj.type} object has no key`)
|
|
}
|
|
key = keyStr
|
|
}
|
|
|
|
if (!property || property.objId !== objId || property.key !== key) {
|
|
let index = 0
|
|
if (property) {
|
|
index = property.index
|
|
if (addPatchProperty(objects, property)) index += 1
|
|
if (property.objId !== objId) index = 0
|
|
}
|
|
property = {objId, key, index, ops: []}
|
|
}
|
|
property.ops.push({opId, actionName, value, childId, succ})
|
|
}
|
|
|
|
if (property) addPatchProperty(objects, property)
|
|
condenseEdits(objects._root)
|
|
return objects._root
|
|
}
|
|
|
|
module.exports = {
|
|
COLUMN_TYPE, VALUE_TYPE, ACTIONS, OBJECT_TYPE, DOC_OPS_COLUMNS, CHANGE_COLUMNS,
|
|
encoderByColumnId, decoderByColumnId, makeDecoders, decodeValue,
|
|
splitContainers, encodeChange, decodeChangeColumns, decodeChange, decodeChangeMeta, decodeChanges,
|
|
decodeDocumentHeader, encodeDocument, decodeDocument,
|
|
getChangeChecksum, appendEdit, constructPatch
|
|
}
|