automerge/rust/automerge/src/storage/load/change_collector.rs
Alex Good dd3c6d1303
Move rust workspace into ./rust
After some discussion with PVH I realise that the repo structure in the
last reorg was very rust-centric. In an attempt to put each language on
a level footing move the rust code and project files into ./rust
2022-10-16 19:55:51 +01:00

209 lines
7.2 KiB
Rust

use std::{
borrow::Cow,
collections::{BTreeSet, HashMap},
num::NonZeroU64,
};
use tracing::instrument;
use crate::{
op_tree::OpSetMetadata,
storage::{
change::{PredOutOfOrder, Verified},
convert::op_as_actor_id,
Change as StoredChange, ChangeMetadata,
},
types::{ChangeHash, ObjId, Op},
};
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("a change referenced an actor index we couldn't find")]
MissingActor,
#[error("changes out of order")]
ChangesOutOfOrder,
#[error("missing change")]
MissingChange,
#[error("unable to read change metadata: {0}")]
ReadChange(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("missing ops")]
MissingOps,
}
pub(crate) struct ChangeCollector<'a> {
changes_by_actor: HashMap<usize, Vec<PartialChange<'a>>>,
}
pub(crate) struct CollectedChanges<'a> {
pub(crate) history: Vec<StoredChange<'a, Verified>>,
pub(crate) heads: BTreeSet<ChangeHash>,
}
impl<'a> ChangeCollector<'a> {
pub(crate) fn new<E: std::error::Error + Send + Sync + 'static, I>(
changes: I,
) -> Result<ChangeCollector<'a>, Error>
where
I: IntoIterator<Item = Result<ChangeMetadata<'a>, E>>,
{
let mut changes_by_actor: HashMap<usize, Vec<PartialChange<'_>>> = HashMap::new();
for (index, change) in changes.into_iter().enumerate() {
tracing::trace!(?change, "importing change metadata");
let change = change.map_err(|e| Error::ReadChange(Box::new(e)))?;
let actor_changes = changes_by_actor.entry(change.actor).or_default();
if let Some(prev) = actor_changes.last() {
// Note that we allow max_op to be equal to the previous max_op in case the
// previous change had no ops (which is permitted)
if prev.max_op > change.max_op {
return Err(Error::ChangesOutOfOrder);
}
}
actor_changes.push(PartialChange {
index,
deps: change.deps,
actor: change.actor,
seq: change.seq,
timestamp: change.timestamp,
max_op: change.max_op,
message: change.message,
extra_bytes: change.extra,
ops: Vec::new(),
})
}
let num_changes: usize = changes_by_actor.values().map(|v| v.len()).sum();
tracing::trace!(num_changes, "change collection context created");
Ok(ChangeCollector { changes_by_actor })
}
#[instrument(skip(self))]
pub(crate) fn collect(&mut self, obj: ObjId, op: Op) -> Result<(), Error> {
let actor_changes = self
.changes_by_actor
.get_mut(&op.id.actor())
.ok_or_else(|| {
tracing::error!(missing_actor = op.id.actor(), "missing actor for op");
Error::MissingActor
})?;
let change_index = actor_changes.partition_point(|c| c.max_op < op.id.counter());
let change = actor_changes.get_mut(change_index).ok_or_else(|| {
tracing::error!(missing_change_index = change_index, "missing change for op");
Error::MissingChange
})?;
change.ops.push((obj, op));
Ok(())
}
#[instrument(skip(self, metadata))]
pub(crate) fn finish(
self,
metadata: &OpSetMetadata,
) -> Result<CollectedChanges<'static>, Error> {
let mut changes_in_order =
Vec::with_capacity(self.changes_by_actor.values().map(|c| c.len()).sum());
for (_, changes) in self.changes_by_actor {
let mut seq = None;
for change in changes {
if let Some(seq) = seq {
if seq != change.seq - 1 {
return Err(Error::ChangesOutOfOrder);
}
} else if change.seq != 1 {
return Err(Error::ChangesOutOfOrder);
}
seq = Some(change.seq);
changes_in_order.push(change);
}
}
changes_in_order.sort_by_key(|c| c.index);
let mut hashes_by_index = HashMap::new();
let mut history = Vec::new();
let mut heads = BTreeSet::new();
for (index, change) in changes_in_order.into_iter().enumerate() {
let finished = change.finish(&hashes_by_index, metadata)?;
let hash = finished.hash();
hashes_by_index.insert(index, hash);
for dep in finished.dependencies() {
heads.remove(dep);
}
heads.insert(hash);
history.push(finished.into_owned());
}
Ok(CollectedChanges { history, heads })
}
}
#[derive(Debug)]
struct PartialChange<'a> {
index: usize,
deps: Vec<u64>,
actor: usize,
seq: u64,
max_op: u64,
timestamp: i64,
message: Option<smol_str::SmolStr>,
extra_bytes: Cow<'a, [u8]>,
ops: Vec<(ObjId, Op)>,
}
impl<'a> PartialChange<'a> {
/// # Panics
///
/// * If any op references a property index which is not in `props`
/// * If any op references an actor index which is not in `actors`
#[instrument(skip(self, known_changes, metadata))]
fn finish(
mut self,
known_changes: &HashMap<usize, ChangeHash>,
metadata: &OpSetMetadata,
) -> Result<StoredChange<'a, Verified>, Error> {
let deps_len = self.deps.len();
let mut deps = self.deps.into_iter().try_fold::<_, _, Result<_, Error>>(
Vec::with_capacity(deps_len),
|mut acc, dep| {
acc.push(known_changes.get(&(dep as usize)).cloned().ok_or_else(|| {
tracing::error!(
dependent_index = self.index,
dep_index = dep,
"could not find dependency"
);
Error::MissingChange
})?);
Ok(acc)
},
)?;
deps.sort();
let num_ops = self.ops.len() as u64;
self.ops.sort_by_key(|o| o.1.id);
let converted_ops = self
.ops
.iter()
.map(|(obj, op)| op_as_actor_id(obj, op, metadata));
let actor = metadata.actors.get(self.actor).clone();
let change = match StoredChange::builder()
.with_dependencies(deps)
.with_actor(actor)
.with_seq(self.seq)
.with_start_op(NonZeroU64::new(self.max_op - num_ops + 1).ok_or(Error::MissingOps)?)
.with_timestamp(self.timestamp)
.with_message(self.message.map(|s| s.to_string()))
.with_extra_bytes(self.extra_bytes.into_owned())
.build(converted_ops)
{
Ok(s) => s,
Err(PredOutOfOrder) => {
// SAFETY: types::Op::preds is `types::OpIds` which ensures ops are always sorted
panic!("preds out of order");
}
};
#[cfg(not(debug_assertions))]
tracing::trace!(?change, hash=?change.hash(), "collected change");
#[cfg(debug_assertions)]
{
tracing::trace!(?change, ops=?self.ops, hash=?change.hash(), "collected change");
}
Ok(change)
}
}