automerge/automerge-backend/src/columnar.rs
2021-06-18 11:13:46 -04:00

1365 lines
42 KiB
Rust

use core::fmt::Debug;
use std::{
borrow::Cow,
cmp::Ordering,
collections::HashMap,
io,
io::{Read, Write},
ops::Range,
str,
};
use automerge_protocol as amp;
use flate2::bufread::DeflateDecoder;
use tracing::instrument;
use crate::{
decoding::{BooleanDecoder, Decodable, Decoder, DeltaDecoder, RleDecoder},
encoding::{BooleanEncoder, ColData, DeltaEncoder, Encodable, RleEncoder},
expanded_op::ExpandedOp,
internal::InternalOpType,
};
impl Encodable for Action {
fn encode<R: Write>(&self, buf: &mut R) -> io::Result<usize> {
(*self as u32).encode(buf)
}
}
impl Encodable for [amp::ActorId] {
fn encode<R: Write>(&self, buf: &mut R) -> io::Result<usize> {
let mut len = self.len().encode(buf)?;
for i in self {
len += i.to_bytes().encode(buf)?;
}
Ok(len)
}
}
fn map_actor(actor: &amp::ActorId, actors: &mut Vec<amp::ActorId>) -> usize {
if let Some(pos) = actors.iter().position(|a| a == actor) {
pos
} else {
actors.push(actor.clone());
actors.len() - 1
}
}
impl Encodable for amp::ActorId {
fn encode_with_actors<R: Write>(
&self,
buf: &mut R,
actors: &mut Vec<amp::ActorId>,
) -> io::Result<usize> {
map_actor(self, actors).encode(buf)
}
fn encode<R: Write>(&self, _buf: &mut R) -> io::Result<usize> {
// we instead encode actors as their position on a sequence
Ok(0)
}
}
impl Encodable for Vec<u8> {
fn encode<R: Write>(&self, buf: &mut R) -> io::Result<usize> {
self.as_slice().encode(buf)
}
}
impl Encodable for &[u8] {
fn encode<R: Write>(&self, buf: &mut R) -> io::Result<usize> {
let head = self.len().encode(buf)?;
buf.write_all(self)?;
Ok(head + self.len())
}
}
pub struct OperationIterator<'a> {
pub(crate) action: RleDecoder<'a, Action>,
pub(crate) objs: ObjIterator<'a>,
pub(crate) keys: KeyIterator<'a>,
pub(crate) insert: BooleanDecoder<'a>,
pub(crate) value: ValueIterator<'a>,
pub(crate) pred: PredIterator<'a>,
}
impl<'a> OperationIterator<'a> {
pub(crate) fn new(
bytes: &'a [u8],
actors: &'a [amp::ActorId],
ops: &'a HashMap<u32, Range<usize>>,
) -> OperationIterator<'a> {
OperationIterator {
objs: ObjIterator {
actors,
actor: col_iter(bytes, ops, COL_OBJ_ACTOR),
ctr: col_iter(bytes, ops, COL_OBJ_CTR),
},
keys: KeyIterator {
actors,
actor: col_iter(bytes, ops, COL_KEY_ACTOR),
ctr: col_iter(bytes, ops, COL_KEY_CTR),
str: col_iter(bytes, ops, COL_KEY_STR),
},
value: ValueIterator {
val_len: col_iter(bytes, ops, COL_VAL_LEN),
val_raw: col_iter(bytes, ops, COL_VAL_RAW),
actors,
actor: col_iter(bytes, ops, COL_REF_ACTOR),
ctr: col_iter(bytes, ops, COL_REF_CTR),
},
pred: PredIterator {
actors,
pred_num: col_iter(bytes, ops, COL_PRED_NUM),
pred_actor: col_iter(bytes, ops, COL_PRED_ACTOR),
pred_ctr: col_iter(bytes, ops, COL_PRED_CTR),
},
insert: col_iter(bytes, ops, COL_INSERT),
action: col_iter(bytes, ops, COL_ACTION),
}
}
}
impl<'a> Iterator for OperationIterator<'a> {
type Item = ExpandedOp<'a>;
fn next(&mut self) -> Option<Self::Item> {
let action = self.action.next()??;
let insert = self.insert.next()?;
let obj = self.objs.next()?;
let key = self.keys.next()?;
let pred = self.pred.next()?;
#[cfg(debug_assertions)]
{
let mut pred_sorted = pred.clone();
pred_sorted.sort();
debug_assert_eq!(pred, pred_sorted, "pred should be sorted");
}
let value = self.value.next()?;
let action = match action {
Action::Set => InternalOpType::Set(value),
Action::MakeList => InternalOpType::Make(amp::ObjType::List),
Action::MakeText => InternalOpType::Make(amp::ObjType::Text),
Action::MakeMap => InternalOpType::Make(amp::ObjType::Map),
Action::MakeTable => InternalOpType::Make(amp::ObjType::Table),
Action::Del => InternalOpType::Del,
Action::Inc => InternalOpType::Inc(value.to_i64()?),
};
Some(ExpandedOp {
action,
obj: Cow::Owned(obj),
key: Cow::Owned(key),
pred: Cow::Owned(pred),
insert,
})
}
}
pub(crate) struct DocOpIterator<'a> {
pub(crate) actor: RleDecoder<'a, usize>,
pub(crate) ctr: DeltaDecoder<'a>,
pub(crate) action: RleDecoder<'a, Action>,
pub(crate) objs: ObjIterator<'a>,
pub(crate) keys: KeyIterator<'a>,
pub(crate) insert: BooleanDecoder<'a>,
pub(crate) value: ValueIterator<'a>,
pub(crate) succ: SuccIterator<'a>,
}
impl<'a> Iterator for DocOpIterator<'a> {
type Item = DocOp;
fn next(&mut self) -> Option<DocOp> {
let action = self.action.next()??;
let actor = self.actor.next()??;
let ctr = self.ctr.next()??;
let insert = self.insert.next()?;
let obj = self.objs.next()?;
let key = self.keys.next()?;
let succ = self.succ.next()?;
let value = self.value.next()?;
let action = match action {
Action::Set => InternalOpType::Set(value),
Action::MakeList => InternalOpType::Make(amp::ObjType::List),
Action::MakeText => InternalOpType::Make(amp::ObjType::Text),
Action::MakeMap => InternalOpType::Make(amp::ObjType::Map),
Action::MakeTable => InternalOpType::Make(amp::ObjType::Table),
Action::Del => InternalOpType::Del,
Action::Inc => InternalOpType::Inc(value.to_i64()?),
};
Some(DocOp {
actor,
ctr,
action,
obj,
key,
succ,
pred: Vec::new(),
insert,
})
}
}
impl<'a> DocOpIterator<'a> {
pub(crate) fn new(
bytes: &'a [u8],
actors: &'a [amp::ActorId],
ops: &'a HashMap<u32, Range<usize>>,
) -> DocOpIterator<'a> {
DocOpIterator {
actor: col_iter(bytes, ops, COL_ID_ACTOR),
ctr: col_iter(bytes, ops, COL_ID_CTR),
objs: ObjIterator {
actors,
actor: col_iter(bytes, ops, COL_OBJ_ACTOR),
ctr: col_iter(bytes, ops, COL_OBJ_CTR),
},
keys: KeyIterator {
actors,
actor: col_iter(bytes, ops, COL_KEY_ACTOR),
ctr: col_iter(bytes, ops, COL_KEY_CTR),
str: col_iter(bytes, ops, COL_KEY_STR),
},
value: ValueIterator {
val_len: col_iter(bytes, ops, COL_VAL_LEN),
val_raw: col_iter(bytes, ops, COL_VAL_RAW),
actors,
actor: col_iter(bytes, ops, COL_REF_ACTOR),
ctr: col_iter(bytes, ops, COL_REF_CTR),
},
succ: SuccIterator {
succ_num: col_iter(bytes, ops, COL_SUCC_NUM),
succ_actor: col_iter(bytes, ops, COL_SUCC_ACTOR),
succ_ctr: col_iter(bytes, ops, COL_SUCC_CTR),
},
insert: col_iter(bytes, ops, COL_INSERT),
action: col_iter(bytes, ops, COL_ACTION),
}
}
}
pub(crate) struct ChangeIterator<'a> {
pub(crate) actor: RleDecoder<'a, usize>,
pub(crate) seq: DeltaDecoder<'a>,
pub(crate) max_op: DeltaDecoder<'a>,
pub(crate) time: DeltaDecoder<'a>,
pub(crate) message: RleDecoder<'a, String>,
pub(crate) deps: DepsIterator<'a>,
pub(crate) extra: ExtraIterator<'a>,
}
impl<'a> ChangeIterator<'a> {
pub(crate) fn new(bytes: &'a [u8], ops: &'a HashMap<u32, Range<usize>>) -> ChangeIterator<'a> {
ChangeIterator {
actor: col_iter(bytes, ops, DOC_ACTOR),
seq: col_iter(bytes, ops, DOC_SEQ),
max_op: col_iter(bytes, ops, DOC_MAX_OP),
time: col_iter(bytes, ops, DOC_TIME),
message: col_iter(bytes, ops, DOC_MESSAGE),
deps: DepsIterator {
num: col_iter(bytes, ops, DOC_DEPS_NUM),
dep: col_iter(bytes, ops, DOC_DEPS_INDEX),
},
extra: ExtraIterator {
len: col_iter(bytes, ops, DOC_EXTRA_LEN),
extra: col_iter(bytes, ops, DOC_EXTRA_RAW),
},
}
}
}
impl<'a> Iterator for ChangeIterator<'a> {
type Item = DocChange;
fn next(&mut self) -> Option<DocChange> {
let actor = self.actor.next()??;
let seq = self.seq.next()??;
let max_op = self.max_op.next()??;
let time = self.time.next()?? as i64;
let message = self.message.next()?;
let deps = self.deps.next()?;
let extra_bytes = self.extra.next().unwrap_or_else(Vec::new);
Some(DocChange {
actor,
seq,
max_op,
time,
message,
deps,
extra_bytes,
ops: Vec::new(),
})
}
}
pub struct ObjIterator<'a> {
//actors: &'a Vec<&'a [u8]>,
pub(crate) actors: &'a [amp::ActorId],
pub(crate) actor: RleDecoder<'a, usize>,
pub(crate) ctr: RleDecoder<'a, u64>,
}
pub struct DepsIterator<'a> {
pub(crate) num: RleDecoder<'a, usize>,
pub(crate) dep: DeltaDecoder<'a>,
}
pub struct ExtraIterator<'a> {
pub(crate) len: RleDecoder<'a, usize>,
pub(crate) extra: Decoder<'a>,
}
pub struct PredIterator<'a> {
pub(crate) actors: &'a [amp::ActorId],
pub(crate) pred_num: RleDecoder<'a, usize>,
pub(crate) pred_actor: RleDecoder<'a, usize>,
pub(crate) pred_ctr: DeltaDecoder<'a>,
}
pub struct SuccIterator<'a> {
pub(crate) succ_num: RleDecoder<'a, usize>,
pub(crate) succ_actor: RleDecoder<'a, usize>,
pub(crate) succ_ctr: DeltaDecoder<'a>,
}
pub struct KeyIterator<'a> {
pub(crate) actors: &'a [amp::ActorId],
pub(crate) actor: RleDecoder<'a, usize>,
pub(crate) ctr: DeltaDecoder<'a>,
pub(crate) str: RleDecoder<'a, String>,
}
pub struct ValueIterator<'a> {
pub(crate) actors: &'a [amp::ActorId],
pub(crate) val_len: RleDecoder<'a, usize>,
pub(crate) val_raw: Decoder<'a>,
pub(crate) actor: RleDecoder<'a, usize>,
pub(crate) ctr: RleDecoder<'a, u64>,
}
impl<'a> Iterator for DepsIterator<'a> {
type Item = Vec<usize>;
fn next(&mut self) -> Option<Vec<usize>> {
let num = self.num.next()??;
// I bet there's something simple like `self.dep.take(num).collect()`
let mut p = Vec::with_capacity(num);
for _ in 0..num {
let dep = self.dep.next()??;
p.push(dep as usize);
}
Some(p)
}
}
impl<'a> Iterator for ExtraIterator<'a> {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Vec<u8>> {
let v = self.len.next()??;
// if v % 16 == VALUE_TYPE_BYTES => { // this should be bytes
let len = v >> 4;
self.extra.read_bytes(len).ok().map(|s| s.to_vec())
}
}
impl<'a> Iterator for PredIterator<'a> {
type Item = Vec<amp::OpId>;
fn next(&mut self) -> Option<Vec<amp::OpId>> {
let num = self.pred_num.next()??;
let mut p = Vec::with_capacity(num);
for _ in 0..num {
let actor = self.pred_actor.next()??;
let ctr = self.pred_ctr.next()??;
let actor_id = self.actors.get(actor)?.clone();
let op_id = amp::OpId::new(ctr, &actor_id);
p.push(op_id)
}
Some(p)
}
}
impl<'a> Iterator for SuccIterator<'a> {
type Item = Vec<(u64, usize)>;
fn next(&mut self) -> Option<Vec<(u64, usize)>> {
let num = self.succ_num.next()??;
let mut p = Vec::with_capacity(num);
for _ in 0..num {
let actor = self.succ_actor.next()??;
let ctr = self.succ_ctr.next()??;
p.push((ctr, actor))
}
Some(p)
}
}
impl<'a> Iterator for ValueIterator<'a> {
type Item = amp::ScalarValue;
fn next(&mut self) -> Option<amp::ScalarValue> {
let val_type = self.val_len.next()??;
let actor = self.actor.next()?;
let ctr = self.ctr.next()?;
match val_type {
VALUE_TYPE_NULL => Some(amp::ScalarValue::Null),
VALUE_TYPE_FALSE => Some(amp::ScalarValue::Boolean(false)),
VALUE_TYPE_TRUE => Some(amp::ScalarValue::Boolean(true)),
v if v % 16 == VALUE_TYPE_COUNTER => {
let len = v >> 4;
let val = self.val_raw.read().ok()?;
if len != self.val_raw.last_read {
return None;
}
Some(amp::ScalarValue::Counter(val))
}
v if v % 16 == VALUE_TYPE_TIMESTAMP => {
let len = v >> 4;
let val = self.val_raw.read().ok()?;
if len != self.val_raw.last_read {
return None;
}
Some(amp::ScalarValue::Timestamp(val))
}
v if v % 16 == VALUE_TYPE_LEB128_UINT => {
let len = v >> 4;
let val = self.val_raw.read().ok()?;
if len != self.val_raw.last_read {
return None;
}
Some(amp::ScalarValue::Uint(val))
}
v if v % 16 == VALUE_TYPE_LEB128_INT => {
let len = v >> 4;
let val = self.val_raw.read().ok()?;
if len != self.val_raw.last_read {
return None;
}
Some(amp::ScalarValue::Int(val))
}
v if v % 16 == VALUE_TYPE_UTF8 => {
let len = v >> 4;
let data = self.val_raw.read_bytes(len).ok()?;
let s = str::from_utf8(data).ok()?;
Some(amp::ScalarValue::Str(s.to_string()))
}
v if v % 16 == VALUE_TYPE_BYTES => {
let len = v >> 4;
let data = self.val_raw.read_bytes(len).ok()?;
Some(amp::ScalarValue::Bytes(data.to_vec()))
}
v if v % 16 >= VALUE_TYPE_MIN_UNKNOWN && v % 16 <= VALUE_TYPE_MAX_UNKNOWN => {
let len = v >> 4;
let _data = self.val_raw.read_bytes(len).ok()?;
unimplemented!()
//Some((amp::Value::Bytes(data))
}
v if v % 16 == VALUE_TYPE_IEEE754 => {
let len = v >> 4;
if len == 8 {
// confirm only 8 bytes read
let num = self.val_raw.read().ok()?;
Some(amp::ScalarValue::F64(num))
} else {
// bad size of float
None
}
}
v if v % 16 == VALUE_TYPE_CURSOR => {
if let (Some(actor), Some(ctr)) = (actor, ctr) {
let actor_id = self.actors.get(actor)?;
Some(amp::ScalarValue::Cursor(amp::OpId(ctr, actor_id.clone())))
} else {
None
}
}
_ => {
// unknown command
None
}
}
}
}
impl<'a> Iterator for KeyIterator<'a> {
type Item = amp::Key;
fn next(&mut self) -> Option<amp::Key> {
match (self.actor.next()?, self.ctr.next()?, self.str.next()?) {
(None, None, Some(string)) => Some(amp::Key::Map(string)),
(None, Some(0), None) => Some(amp::Key::head()),
(Some(actor), Some(ctr), None) => {
let actor_id = self.actors.get(actor)?;
Some(amp::OpId::new(ctr, actor_id).into())
}
_ => None,
}
}
}
impl<'a> Iterator for ObjIterator<'a> {
type Item = amp::ObjectId;
fn next(&mut self) -> Option<amp::ObjectId> {
if let (Some(actor), Some(ctr)) = (self.actor.next()?, self.ctr.next()?) {
let actor_id = self.actors.get(actor)?;
Some(amp::ObjectId::Id(amp::OpId::new(ctr, actor_id)))
} else {
Some(amp::ObjectId::Root)
}
}
}
#[derive(PartialEq, Debug, Clone)]
pub(crate) struct DocChange {
pub actor: usize,
pub seq: u64,
pub max_op: u64,
pub time: i64,
pub message: Option<String>,
pub deps: Vec<usize>,
pub extra_bytes: Vec<u8>,
pub ops: Vec<DocOp>,
}
#[derive(Debug, Clone)]
pub(crate) struct DocOp {
pub actor: usize,
pub ctr: u64,
pub action: InternalOpType,
pub obj: amp::ObjectId,
pub key: amp::Key,
pub succ: Vec<(u64, usize)>,
pub pred: Vec<(u64, usize)>,
pub insert: bool,
}
impl Ord for DocOp {
fn cmp(&self, other: &Self) -> Ordering {
self.ctr.cmp(&other.ctr)
}
}
impl PartialOrd for DocOp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for DocOp {
fn eq(&self, other: &Self) -> bool {
self.ctr == other.ctr
}
}
impl Eq for DocOp {}
struct ValEncoder {
len: RleEncoder<usize>,
ref_actor: RleEncoder<usize>,
ref_counter: RleEncoder<u64>,
raw: Vec<u8>,
}
impl ValEncoder {
fn new() -> ValEncoder {
ValEncoder {
len: RleEncoder::new(),
raw: Vec::new(),
ref_actor: RleEncoder::new(),
ref_counter: RleEncoder::new(),
}
}
fn append_value(&mut self, val: &amp::ScalarValue, actors: &mut Vec<amp::ActorId>) {
// It may seem weird to have two consecutive matches on the same value. The reason is so
// that we don't have to repeat the `append_null` calls on ref_actor and ref_counter in
// every arm of the next match
if !matches!(val, amp::ScalarValue::Cursor(_)) {
self.ref_actor.append_null();
self.ref_counter.append_null();
}
match val {
amp::ScalarValue::Null => self.len.append_value(VALUE_TYPE_NULL),
amp::ScalarValue::Boolean(true) => self.len.append_value(VALUE_TYPE_TRUE),
amp::ScalarValue::Boolean(false) => self.len.append_value(VALUE_TYPE_FALSE),
amp::ScalarValue::Bytes(bytes) => {
let len = bytes.len();
self.raw.extend(bytes);
self.len.append_value(len << 4 | VALUE_TYPE_BYTES)
}
amp::ScalarValue::Str(s) => {
let bytes = s.as_bytes();
let len = bytes.len();
self.raw.extend(bytes);
self.len.append_value(len << 4 | VALUE_TYPE_UTF8)
}
amp::ScalarValue::Counter(count) => {
let len = count.encode(&mut self.raw).unwrap();
self.len.append_value(len << 4 | VALUE_TYPE_COUNTER)
}
amp::ScalarValue::Timestamp(time) => {
let len = time.encode(&mut self.raw).unwrap();
self.len.append_value(len << 4 | VALUE_TYPE_TIMESTAMP)
}
amp::ScalarValue::Int(n) => {
let len = n.encode(&mut self.raw).unwrap();
self.len.append_value(len << 4 | VALUE_TYPE_LEB128_INT)
}
amp::ScalarValue::Uint(n) => {
let len = n.encode(&mut self.raw).unwrap();
self.len.append_value(len << 4 | VALUE_TYPE_LEB128_UINT)
}
amp::ScalarValue::F64(n) => {
let len = (*n).encode(&mut self.raw).unwrap();
self.len.append_value(len << 4 | VALUE_TYPE_IEEE754)
}
amp::ScalarValue::Cursor(opid) => {
// the cursor opid are encoded in DocOpEncoder::encode and ColumnEncoder::encode
self.len.append_value(VALUE_TYPE_CURSOR);
let actor_index = map_actor(&opid.1, actors);
self.ref_actor.append_value(actor_index);
self.ref_counter.append_value(opid.0);
}
}
}
fn append_null(&mut self) {
self.ref_counter.append_null();
self.ref_actor.append_null();
self.len.append_value(VALUE_TYPE_NULL)
}
fn finish(self) -> Vec<ColData> {
vec![
self.ref_counter.finish(COL_REF_CTR),
self.ref_actor.finish(COL_REF_ACTOR),
self.len.finish(COL_VAL_LEN),
ColData::new(COL_VAL_RAW, self.raw),
]
}
}
struct KeyEncoder {
actor: RleEncoder<usize>,
ctr: DeltaEncoder,
str: RleEncoder<String>,
}
impl KeyEncoder {
fn new() -> KeyEncoder {
KeyEncoder {
actor: RleEncoder::new(),
ctr: DeltaEncoder::new(),
str: RleEncoder::new(),
}
}
fn append(&mut self, key: &amp::Key, actors: &mut Vec<amp::ActorId>) {
match &key {
amp::Key::Map(s) => {
self.actor.append_null();
self.ctr.append_null();
self.str.append_value(s.clone());
}
amp::Key::Seq(amp::ElementId::Head) => {
self.actor.append_null();
self.ctr.append_value(0);
self.str.append_null();
}
amp::Key::Seq(amp::ElementId::Id(amp::OpId(ctr, actor))) => {
self.actor.append_value(map_actor(actor, actors));
self.ctr.append_value(*ctr);
self.str.append_null();
}
}
}
fn finish(self) -> Vec<ColData> {
vec![
self.actor.finish(COL_KEY_ACTOR),
self.ctr.finish(COL_KEY_CTR),
self.str.finish(COL_KEY_STR),
]
}
}
struct SuccEncoder {
num: RleEncoder<usize>,
actor: RleEncoder<usize>,
ctr: DeltaEncoder,
}
impl SuccEncoder {
fn new() -> SuccEncoder {
SuccEncoder {
num: RleEncoder::new(),
actor: RleEncoder::new(),
ctr: DeltaEncoder::new(),
}
}
fn append(&mut self, succ: &[(u64, usize)]) {
self.num.append_value(succ.len());
for s in succ.iter() {
self.ctr.append_value(s.0);
self.actor.append_value(s.1);
}
}
fn finish(self) -> Vec<ColData> {
vec![
self.num.finish(COL_SUCC_NUM),
self.actor.finish(COL_SUCC_ACTOR),
self.ctr.finish(COL_SUCC_CTR),
]
}
}
struct PredEncoder {
num: RleEncoder<usize>,
actor: RleEncoder<usize>,
ctr: DeltaEncoder,
}
impl PredEncoder {
fn new() -> PredEncoder {
PredEncoder {
num: RleEncoder::new(),
actor: RleEncoder::new(),
ctr: DeltaEncoder::new(),
}
}
fn append(&mut self, pred: &[amp::OpId], actors: &mut Vec<amp::ActorId>) {
self.num.append_value(pred.len());
for p in pred.iter() {
self.ctr.append_value(p.0);
self.actor.append_value(map_actor(&p.1, actors));
}
}
fn finish(self) -> Vec<ColData> {
vec![
self.num.finish(COL_PRED_NUM),
self.actor.finish(COL_PRED_ACTOR),
self.ctr.finish(COL_PRED_CTR),
]
}
}
struct ObjEncoder {
actor: RleEncoder<usize>,
ctr: RleEncoder<u64>,
}
impl ObjEncoder {
fn new() -> ObjEncoder {
ObjEncoder {
actor: RleEncoder::new(),
ctr: RleEncoder::new(),
}
}
fn append(&mut self, obj: &amp::ObjectId, actors: &mut Vec<amp::ActorId>) {
match obj {
amp::ObjectId::Root => {
self.actor.append_null();
self.ctr.append_null();
}
amp::ObjectId::Id(amp::OpId(ctr, actor)) => {
self.actor.append_value(map_actor(actor, actors));
self.ctr.append_value(*ctr);
}
}
}
fn finish(self) -> Vec<ColData> {
vec![
self.actor.finish(COL_OBJ_ACTOR),
self.ctr.finish(COL_OBJ_CTR),
]
}
}
pub(crate) struct ChangeEncoder {
actor: RleEncoder<usize>,
seq: DeltaEncoder,
max_op: DeltaEncoder,
time: DeltaEncoder,
message: RleEncoder<Option<String>>,
deps_num: RleEncoder<usize>,
deps_index: DeltaEncoder,
extra_len: RleEncoder<usize>,
extra_raw: Vec<u8>,
}
impl ChangeEncoder {
#[instrument(level = "debug", skip(changes, actors))]
pub fn encode_changes<'a, 'b, I>(changes: I, actors: &'a [amp::ActorId]) -> (Vec<u8>, Vec<u8>)
where
I: IntoIterator<Item = &'b amp::Change>,
{
let mut e = Self::new();
e.encode(changes, actors);
e.finish()
}
fn new() -> ChangeEncoder {
ChangeEncoder {
actor: RleEncoder::new(),
seq: DeltaEncoder::new(),
max_op: DeltaEncoder::new(),
time: DeltaEncoder::new(),
message: RleEncoder::new(),
deps_num: RleEncoder::new(),
deps_index: DeltaEncoder::new(),
extra_len: RleEncoder::new(),
extra_raw: Vec::new(),
}
}
fn encode<'a, 'b, 'c, I>(&'a mut self, changes: I, actors: &'b [amp::ActorId])
where
I: IntoIterator<Item = &'c amp::Change>,
{
let mut index_by_hash: HashMap<amp::ChangeHash, usize> = HashMap::new();
for (index, change) in changes.into_iter().enumerate() {
if let Some(hash) = change.hash {
index_by_hash.insert(hash, index);
}
self.actor
.append_value(actors.iter().position(|a| a == &change.actor_id).unwrap());
self.seq.append_value(change.seq);
self.max_op
.append_value(change.start_op + change.operations.len() as u64 - 1);
self.time.append_value(change.time as u64);
self.message.append_value(change.message.clone());
self.deps_num.append_value(change.deps.len());
for dep in &change.deps {
if let Some(dep_index) = index_by_hash.get(dep) {
self.deps_index.append_value(*dep_index as u64)
} else {
// FIXME This relies on the changes being in causal order, which they may not
// be, we could probably do something cleverer like accumulate the values to
// write and the dependency tree in an intermediate value, then write it to the
// encoder in a second pass over the intermediates
panic!("Missing dependency for hash: {:?}", dep);
}
}
self.extra_len
.append_value(change.extra_bytes.len() << 4 | VALUE_TYPE_BYTES);
self.extra_raw.extend(&change.extra_bytes);
}
}
fn finish(self) -> (Vec<u8>, Vec<u8>) {
let mut coldata = vec![
self.actor.finish(DOC_ACTOR),
self.seq.finish(DOC_SEQ),
self.max_op.finish(DOC_MAX_OP),
self.time.finish(DOC_TIME),
self.message.finish(DOC_MESSAGE),
self.deps_num.finish(DOC_DEPS_NUM),
self.deps_index.finish(DOC_DEPS_INDEX),
self.extra_len.finish(DOC_EXTRA_LEN),
ColData::new(DOC_EXTRA_RAW, self.extra_raw),
];
coldata.sort_by(|a, b| a.col.cmp(&b.col));
let mut data = Vec::new();
let mut info = Vec::new();
coldata
.iter()
.filter(|&d| !d.data.is_empty())
.count()
.encode(&mut info)
.ok();
for d in &mut coldata {
d.deflate();
d.encode_col_len(&mut info).ok();
}
for d in &coldata {
data.write_all(d.data.as_slice()).ok();
}
(data, info)
}
}
pub(crate) struct DocOpEncoder {
actor: RleEncoder<usize>,
ctr: DeltaEncoder,
obj: ObjEncoder,
key: KeyEncoder,
insert: BooleanEncoder,
action: RleEncoder<Action>,
val: ValEncoder,
succ: SuccEncoder,
}
// FIXME - actors should not be mut here
impl DocOpEncoder {
#[instrument(level = "debug", skip(ops, actors))]
pub(crate) fn encode_doc_ops<'a, 'b, I>(
ops: I,
actors: &'a mut Vec<amp::ActorId>,
) -> (Vec<u8>, Vec<u8>)
where
I: IntoIterator<Item = &'b DocOp>,
{
let mut e = Self::new();
e.encode(ops, actors);
e.finish()
}
fn new() -> DocOpEncoder {
DocOpEncoder {
actor: RleEncoder::new(),
ctr: DeltaEncoder::new(),
obj: ObjEncoder::new(),
key: KeyEncoder::new(),
insert: BooleanEncoder::new(),
action: RleEncoder::new(),
val: ValEncoder::new(),
succ: SuccEncoder::new(),
}
}
fn encode<'a, 'b, 'c, I>(&'a mut self, ops: I, actors: &'b mut Vec<amp::ActorId>)
where
I: IntoIterator<Item = &'c DocOp>,
{
for op in ops {
self.actor.append_value(op.actor);
self.ctr.append_value(op.ctr);
self.obj.append(&op.obj, actors);
self.key.append(&op.key, actors);
self.insert.append(op.insert);
self.succ.append(&op.succ);
let action = match &op.action {
InternalOpType::Set(value) => {
self.val.append_value(value, actors);
Action::Set
}
InternalOpType::Inc(val) => {
self.val.append_value(&amp::ScalarValue::Int(*val), actors);
Action::Inc
}
InternalOpType::Del => {
// FIXME throw error
self.val.append_null();
Action::Del
}
InternalOpType::Make(kind) => {
self.val.append_null();
match kind {
amp::ObjType::Map => Action::MakeMap,
amp::ObjType::Table => Action::MakeTable,
amp::ObjType::List => Action::MakeList,
amp::ObjType::Text => Action::MakeText,
}
}
};
self.action.append_value(action);
}
}
fn finish(self) -> (Vec<u8>, Vec<u8>) {
let mut coldata = vec![
self.actor.finish(COL_ID_ACTOR),
self.ctr.finish(COL_ID_CTR),
self.insert.finish(COL_INSERT),
self.action.finish(COL_ACTION),
];
coldata.extend(self.obj.finish());
coldata.extend(self.key.finish());
coldata.extend(self.val.finish());
coldata.extend(self.succ.finish());
coldata.sort_by(|a, b| a.col.cmp(&b.col));
let mut info = Vec::new();
let mut data = Vec::new();
coldata
.iter()
.filter(|&d| !d.data.is_empty())
.count()
.encode(&mut info)
.ok();
for d in &mut coldata {
d.deflate();
d.encode_col_len(&mut info).ok();
}
for d in &coldata {
data.write_all(d.data.as_slice()).ok();
}
(data, info)
}
}
//pub(crate) encode_cols(a) -> (Vec<u8>, HashMap<u32, Range<usize>>) { }
struct ColumnOp<'a> {
action: InternalOpType,
obj: Cow<'a, amp::ObjectId>,
key: Cow<'a, amp::Key>,
pred: Vec<amp::OpId>,
insert: bool,
}
pub(crate) struct ColumnEncoder {
obj: ObjEncoder,
key: KeyEncoder,
insert: BooleanEncoder,
action: RleEncoder<Action>,
val: ValEncoder,
pred: PredEncoder,
}
impl ColumnEncoder {
pub fn encode_ops<'a, 'b, I>(
ops: I,
actors: &'a mut Vec<amp::ActorId>,
) -> (Vec<u8>, HashMap<u32, Range<usize>>)
where
I: IntoIterator<Item = ExpandedOp<'b>>,
{
let mut e = Self::new();
let colops = ops.into_iter().map(|o| ColumnOp {
obj: o.obj,
key: o.key,
action: o.action,
pred: o.pred.into_owned(),
insert: o.insert,
});
e.encode(colops, actors);
e.finish()
}
fn new() -> ColumnEncoder {
ColumnEncoder {
obj: ObjEncoder::new(),
key: KeyEncoder::new(),
insert: BooleanEncoder::new(),
action: RleEncoder::new(),
val: ValEncoder::new(),
pred: PredEncoder::new(),
}
}
fn encode<'a, 'b, 'c, I>(&'a mut self, ops: I, actors: &'b mut Vec<amp::ActorId>)
where
I: IntoIterator<Item = ColumnOp<'c>>,
{
for mut op in ops {
self.append(&mut op, actors)
}
}
fn append<'a>(&mut self, op: &mut ColumnOp<'a>, actors: &mut Vec<amp::ActorId>) {
self.obj.append(&op.obj, actors);
self.key.append(&op.key, actors);
self.insert.append(op.insert);
op.pred.sort();
self.pred.append(&op.pred, actors);
let action = match &op.action {
InternalOpType::Set(value) => {
self.val.append_value(value, actors);
Action::Set
}
InternalOpType::Inc(val) => {
self.val.append_value(&amp::ScalarValue::Int(*val), actors);
Action::Inc
}
InternalOpType::Del => {
self.val.append_null();
Action::Del
}
InternalOpType::Make(kind) => {
self.val.append_null();
match kind {
amp::ObjType::Map => Action::MakeMap,
amp::ObjType::Table => Action::MakeTable,
amp::ObjType::List => Action::MakeList,
amp::ObjType::Text => Action::MakeText,
}
}
};
self.action.append_value(action);
}
fn finish(self) -> (Vec<u8>, HashMap<u32, Range<usize>>) {
let mut coldata = vec![
self.insert.finish(COL_INSERT),
self.action.finish(COL_ACTION),
];
coldata.extend(self.obj.finish());
coldata.extend(self.key.finish());
coldata.extend(self.val.finish());
coldata.extend(self.pred.finish());
coldata.sort_by(|a, b| a.col.cmp(&b.col));
let mut data = Vec::new();
let mut rangemap = HashMap::new();
coldata
.iter()
.filter(|&d| !d.data.is_empty())
.count()
.encode(&mut data)
.ok();
for d in &mut coldata {
d.encode_col_len(&mut data).ok();
}
for d in &coldata {
let begin = data.len();
data.write_all(d.data.as_slice()).ok();
if !d.data.is_empty() {
rangemap.insert(d.col, begin..data.len());
}
}
(data, rangemap)
}
}
fn col_iter<'a, T>(bytes: &'a [u8], ops: &'a HashMap<u32, Range<usize>>, col_id: u32) -> T
where
T: From<Cow<'a, [u8]>>,
{
let bytes = if let Some(r) = ops.get(&col_id) {
Cow::Borrowed(&bytes[r.clone()])
} else if let Some(r) = ops.get(&(col_id | COLUMN_TYPE_DEFLATE)) {
let mut decoder = DeflateDecoder::new(&bytes[r.clone()]);
let mut inflated = Vec::new();
//TODO this could throw if the compression is corrupt, we should propagate the error rather
//than unwrapping
decoder.read_to_end(&mut inflated).unwrap();
Cow::Owned(inflated)
} else {
Cow::from(&[] as &[u8])
};
T::from(bytes)
}
const VALUE_TYPE_NULL: usize = 0;
const VALUE_TYPE_FALSE: usize = 1;
const VALUE_TYPE_TRUE: usize = 2;
const VALUE_TYPE_LEB128_UINT: usize = 3;
const VALUE_TYPE_LEB128_INT: usize = 4;
const VALUE_TYPE_IEEE754: usize = 5;
const VALUE_TYPE_UTF8: usize = 6;
const VALUE_TYPE_BYTES: usize = 7;
const VALUE_TYPE_COUNTER: usize = 8;
const VALUE_TYPE_TIMESTAMP: usize = 9;
const VALUE_TYPE_CURSOR: usize = 10;
const VALUE_TYPE_MIN_UNKNOWN: usize = 11;
const VALUE_TYPE_MAX_UNKNOWN: usize = 15;
pub(crate) const COLUMN_TYPE_GROUP_CARD: u32 = 0;
pub(crate) const COLUMN_TYPE_ACTOR_ID: u32 = 1;
pub(crate) const COLUMN_TYPE_INT_RLE: u32 = 2;
pub(crate) const COLUMN_TYPE_INT_DELTA: u32 = 3;
pub(crate) const COLUMN_TYPE_BOOLEAN: u32 = 4;
pub(crate) const COLUMN_TYPE_STRING_RLE: u32 = 5;
pub(crate) const COLUMN_TYPE_VALUE_LEN: u32 = 6;
pub(crate) const COLUMN_TYPE_VALUE_RAW: u32 = 7;
pub(crate) const COLUMN_TYPE_DEFLATE: u32 = 8;
#[derive(PartialEq, Debug, Clone, Copy)]
#[repr(u32)]
pub(crate) enum Action {
MakeMap,
Set,
MakeList,
Del,
MakeText,
Inc,
MakeTable,
}
const ACTIONS: [Action; 7] = [
Action::MakeMap,
Action::Set,
Action::MakeList,
Action::Del,
Action::MakeText,
Action::Inc,
Action::MakeTable,
];
impl Decodable for Action {
fn decode<R>(bytes: &mut R) -> Option<Self>
where
R: Read,
{
let num = usize::decode::<R>(bytes)?;
ACTIONS.get(num).copied()
}
}
const COL_OBJ_ACTOR: u32 = COLUMN_TYPE_ACTOR_ID;
const COL_OBJ_CTR: u32 = COLUMN_TYPE_INT_RLE;
const COL_KEY_ACTOR: u32 = 1 << 4 | COLUMN_TYPE_ACTOR_ID;
const COL_KEY_CTR: u32 = 1 << 4 | COLUMN_TYPE_INT_DELTA;
const COL_KEY_STR: u32 = 1 << 4 | COLUMN_TYPE_STRING_RLE;
const COL_ID_ACTOR: u32 = 2 << 4 | COLUMN_TYPE_ACTOR_ID;
const COL_ID_CTR: u32 = 2 << 4 | COLUMN_TYPE_INT_DELTA;
const COL_INSERT: u32 = 3 << 4 | COLUMN_TYPE_BOOLEAN;
const COL_ACTION: u32 = 4 << 4 | COLUMN_TYPE_INT_RLE;
const COL_VAL_LEN: u32 = 5 << 4 | COLUMN_TYPE_VALUE_LEN;
const COL_VAL_RAW: u32 = 5 << 4 | COLUMN_TYPE_VALUE_RAW;
const COL_PRED_NUM: u32 = 7 << 4 | COLUMN_TYPE_GROUP_CARD;
const COL_PRED_ACTOR: u32 = 7 << 4 | COLUMN_TYPE_ACTOR_ID;
const COL_PRED_CTR: u32 = 7 << 4 | COLUMN_TYPE_INT_DELTA;
const COL_SUCC_NUM: u32 = 8 << 4 | COLUMN_TYPE_GROUP_CARD;
const COL_SUCC_ACTOR: u32 = 8 << 4 | COLUMN_TYPE_ACTOR_ID;
const COL_SUCC_CTR: u32 = 8 << 4 | COLUMN_TYPE_INT_DELTA;
const COL_REF_CTR: u32 = 6 << 4 | COLUMN_TYPE_INT_RLE;
const COL_REF_ACTOR: u32 = 6 << 4 | COLUMN_TYPE_ACTOR_ID;
const DOC_ACTOR: u32 = /* 0 << 4 */ COLUMN_TYPE_ACTOR_ID;
const DOC_SEQ: u32 = /* 0 << 4 */ COLUMN_TYPE_INT_DELTA;
const DOC_MAX_OP: u32 = 1 << 4 | COLUMN_TYPE_INT_DELTA;
const DOC_TIME: u32 = 2 << 4 | COLUMN_TYPE_INT_DELTA;
const DOC_MESSAGE: u32 = 3 << 4 | COLUMN_TYPE_STRING_RLE;
const DOC_DEPS_NUM: u32 = 4 << 4 | COLUMN_TYPE_GROUP_CARD;
const DOC_DEPS_INDEX: u32 = 4 << 4 | COLUMN_TYPE_INT_DELTA;
const DOC_EXTRA_LEN: u32 = 5 << 4 | COLUMN_TYPE_VALUE_LEN;
const DOC_EXTRA_RAW: u32 = 5 << 4 | COLUMN_TYPE_VALUE_RAW;
/*
const DOCUMENT_COLUMNS = {
actor: 0 << 3 | COLUMN_TYPE.ACTOR_ID,
seq: 0 << 3 | COLUMN_TYPE.INT_DELTA,
maxOp: 1 << 3 | COLUMN_TYPE.INT_DELTA,
time: 2 << 3 | COLUMN_TYPE.INT_DELTA,
message: 3 << 3 | COLUMN_TYPE.STRING_RLE,
depsNum: 4 << 3 | COLUMN_TYPE.GROUP_CARD,
depsIndex: 4 << 3 | COLUMN_TYPE.INT_DELTA,
extraLen: 5 << 3 | COLUMN_TYPE.VALUE_LEN,
extraRaw: 5 << 3 | COLUMN_TYPE.VALUE_RAW
}
*/
#[cfg(test)]
mod tests {
use amp::{ActorId, Key, ScalarValue};
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_rle_encoder_for_strings_from_key() {
// this seems like a strange case but checks that we write nulls into the encoder as usize and read them out the same.
// if we don't then the 64 nulls gets interpreted as -64 and causes the rle decoder to never read the next values.
let ops = vec![
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
Some("a".to_owned()),
];
let mut encoder = RleEncoder::new();
for op in &ops {
if let Some(v) = op {
encoder.append_value(v.clone())
} else {
encoder.append_null()
}
}
let encoded = encoder.finish(0).data;
assert_eq!(encoded, vec![0, 64, 127, 1, 97]);
let decoder: RleDecoder<String> = RleDecoder::from(Cow::from(&encoded[..]));
let decoded = decoder.take(ops.len()).collect::<Vec<_>>();
assert_eq!(decoded, ops);
}
#[test]
fn pred_sorted() {
let actor = ActorId::random();
let actor2 = ActorId::random();
let mut actors = vec![actor.clone(), actor2.clone()];
actors.sort();
let col_op = ColumnOp {
action: InternalOpType::Set(ScalarValue::Null),
obj: Cow::Owned(amp::ObjectId::Root),
key: Cow::Owned(Key::Map("r".to_owned())),
pred: vec![actor.op_id_at(1), actor2.op_id_at(1)],
insert: false,
};
let mut column_encoder = ColumnEncoder::new();
column_encoder.encode(vec![col_op], &mut actors);
let (bytes, _) = column_encoder.finish();
let col_op2 = ColumnOp {
action: InternalOpType::Set(ScalarValue::Null),
obj: Cow::Owned(amp::ObjectId::Root),
key: Cow::Owned(Key::Map("r".to_owned())),
pred: vec![actor2.op_id_at(1), actor.op_id_at(1)],
insert: false,
};
let mut column_encoder = ColumnEncoder::new();
column_encoder.encode(vec![col_op2], &mut actors);
let (bytes2, _) = column_encoder.finish();
assert_eq!(bytes, bytes2);
}
}