automerge/automerge/src/columnar_2/load/loading_document.rs
Alex Good 9332ed4ad9
wip
2022-03-20 14:48:30 +00:00

241 lines
7.8 KiB
Rust

use fxhash::FxBuildHasher;
use std::collections::{HashMap, BTreeSet};
use tracing::instrument;
use super::change_collector::ChangeCollector;
use crate::{
columnar_2::{
storage::Change as StoredChange,
rowblock::{
Key as DocOpKey,
doc_change_columns::ChangeMetadata,
doc_op_columns::DocOp,
PrimVal,
}
},
op_set::OpSet,
op_tree::{OpSetMetadata, OpTree},
types::{ActorId, ChangeHash, ElemId, Key, ObjId, ObjType, Op, OpId, OpType},
};
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("the document contained ops which were out of order")]
OpsOutOfOrder,
#[error("error reading operation: {0:?}")]
ReadOp(Box<dyn std::error::Error>),
#[error("an operation contained an invalid action")]
InvalidAction,
#[error("an operation referenced a missing actor id")]
MissingActor,
#[error("invalid changes: {0}")]
InvalidChanges(#[from] super::change_collector::Error),
#[error("mismatching heads")]
MismatchingHeads,
}
struct LoadingObject {
id: ObjId,
ops: Vec<Op>,
obj_type: ObjType,
preds: HashMap<OpId, Vec<OpId>>,
}
impl LoadingObject {
fn root() -> Self {
LoadingObject {
id: ObjId::root(),
ops: Vec::new(),
obj_type: ObjType::Map,
preds: HashMap::new(),
}
}
fn new(id: ObjId, obj_type: ObjType) -> Self {
LoadingObject {
id: id.into(),
ops: Vec::new(),
obj_type,
preds: HashMap::new(),
}
}
fn append_op(&mut self, op: Op) -> Result<(), Error> {
if let Some(previous_op) = self.ops.last() {
if op.key < previous_op.key {
tracing::error!(
?op,
?previous_op,
"op key was smaller than key of previous op"
);
return Err(Error::OpsOutOfOrder);
}
}
for succ in &op.succ {
self.preds.entry(*succ).or_default().push(op.id);
}
self.ops.push(op);
Ok(())
}
fn finish(mut self) -> (ObjId, ObjType, OpTree) {
let mut op_tree = OpTree::new();
for (index, mut op) in self.ops.into_iter().enumerate() {
if let Some(preds) = self.preds.remove(&op.id) {
op.pred = preds;
}
op_tree.insert(index, op);
}
(self.id, self.obj_type, op_tree)
}
}
pub(crate) struct Loaded<'a> {
pub(crate) op_set: OpSet,
pub(crate) history: Vec<StoredChange<'a>>,
pub(crate) history_index: HashMap<ChangeHash, usize>,
pub(crate) actor_to_history: HashMap<usize, Vec<usize>>,
}
#[instrument(skip(actors, expected_heads, changes, ops))]
pub(crate) fn load<'a, I, C, OE, CE>(
actors: Vec<ActorId>,
expected_heads: BTreeSet<ChangeHash>,
changes: C,
ops: I,
) -> Result<Loaded<'static>, Error>
where
OE: std::error::Error + 'static,
CE: std::error::Error + 'static,
I: Iterator<Item = Result<DocOp<'a>, OE>>,
C: Iterator<Item = Result<ChangeMetadata<'a>, CE>>,
{
let mut metadata = OpSetMetadata::from_actors(actors);
let mut completed_objects = HashMap::<_, _, FxBuildHasher>::default();
let mut current_object = LoadingObject::root();
let mut collector = ChangeCollector::new(changes)?;
let mut obj_types = HashMap::new();
obj_types.insert(ObjId::root(), ObjType::Map);
for op_res in ops {
let doc_op = op_res.map_err(|e| Error::ReadOp(Box::new(e)))?;
let obj = doc_op.object;
let op = import_op(&mut metadata, doc_op)?;
tracing::trace!(?op, "processing op");
collector.collect(current_object.id, op.clone())?;
// We have to record the object types of make operations so that when the object ID the
// incoming operations refer to switches we can lookup the object type for the new object.
// Ultimately we need this because the OpSet needs to know the object ID _and type_ for
// each OpTree it tracks.
if obj == current_object.id {
match op.action {
OpType::Make(obj_type) => {
obj_types.insert(op.id.into(), obj_type.clone());
}
_ => {}
};
current_object.append_op(op)?;
} else {
let new_obj_type = match obj_types.get(&obj) {
Some(t) => Ok(t.clone()),
None => {
tracing::error!(
?op,
"operation referenced an object which we haven't seen a create op for yet"
);
Err(Error::OpsOutOfOrder)
}
}?;
if obj < current_object.id {
tracing::error!(?op, previous_obj=?current_object.id, "op referenced an object ID which was less than the previous object ID");
return Err(Error::OpsOutOfOrder);
} else {
let (id, obj_type, op_tree) = current_object.finish();
current_object = LoadingObject::new(obj, new_obj_type);
current_object.append_op(op)?;
completed_objects.insert(id, (obj_type, op_tree));
}
}
}
let super::change_collector::CollectedChanges{
history,
history_index,
actor_to_history,
heads,
} = collector.finish(
&metadata.actors,
&metadata.props,
)?;
if expected_heads != heads {
tracing::error!(?expected_heads, ?heads, "mismatching heads");
return Err(Error::MismatchingHeads);
}
let (id, obj_type, op_tree) = current_object.finish();
completed_objects.insert(id, (obj_type, op_tree));
let op_set = OpSet::from_parts(completed_objects, metadata);
Ok(Loaded {
op_set,
history,
history_index,
actor_to_history,
})
}
#[instrument(skip(m))]
fn import_op<'a>(m: &mut OpSetMetadata, op: DocOp<'a>) -> Result<Op, Error> {
let key = match op.key {
DocOpKey::Prop(s) => Key::Map(m.import_prop(s)),
DocOpKey::Elem(ElemId(op)) => Key::Seq(ElemId(check_opid(m, op)?)),
};
for opid in &op.succ {
if m.actors.safe_get(opid.actor()).is_none() {
tracing::error!(?opid, "missing actor");
return Err(Error::MissingActor);
}
}
Ok(Op {
id: check_opid(m, op.id)?,
action: parse_optype(op.action, op.value)?,
key,
succ: op.succ,
pred: Vec::new(),
insert: op.insert,
})
}
/// We construct the OpSetMetadata directly from the vector of actors which are encoded in the
/// start of the document. Therefore we need to check for each opid in the docuemnt that the actor
/// ID which it references actually exists in the metadata.
#[tracing::instrument(skip(m))]
fn check_opid(m: &OpSetMetadata, opid: OpId) -> Result<OpId, Error> {
match m.actors.safe_get(opid.actor()) {
Some(_) => Ok(opid),
None => {
tracing::error!("missing actor");
Err(Error::MissingActor)
}
}
}
fn parse_optype<'a>(action_index: usize, value: PrimVal<'a>) -> Result<OpType, Error> {
match action_index {
0 => Ok(OpType::Make(ObjType::Map)),
1 => Ok(OpType::Set(value.into())),
2 => Ok(OpType::Make(ObjType::List)),
3 => Ok(OpType::Del),
4 => Ok(OpType::Make(ObjType::Text)),
5 => match value {
PrimVal::Int(i) => Ok(OpType::Inc(i)),
_ => {
tracing::error!(?value, "invalid value for counter op");
Err(Error::InvalidAction)
}
},
6 => Ok(OpType::Make(ObjType::Table)),
other => {
tracing::error!(action = other, "unknown action type");
Err(Error::InvalidAction)
}
}
}