automerge/automerge/src/storage/change/change_actors.rs
Alex Good 771733deac
Implement storage-v2
Implement parsing the binary format using the new parser library and the
new encoding types. This is superior to the previous parsing
implementation in that invalid data should never cause panics and it
exposes and interface to construct an OpSet from a saved document much
more efficiently.

Signed-off-by: Alex Good <alex@memoryandthought.me>
2022-08-22 21:16:47 +01:00

304 lines
10 KiB
Rust

use std::collections::{BTreeMap, BTreeSet};
use crate::convert;
use super::AsChangeOp;
/// This struct represents the ordering of actor indices in a change chunk. Operations in a change
/// chunk are encoded with the actor ID represented as an offset into an array of actors which are
/// encoded at the start of the chunk. This array is in a specific order: the author of the change
/// is always the first actor, then all other actors referenced in a change are encoded in
/// lexicographic order.
///
/// The intended usage is to construct a `ChangeActors` from an iterator over `AsChangeOp` where
/// the `ActorId` of the `AsChangeOp` implementation is the original actor ID. The resulting
/// `ChangeActors` implements `Iterator` where the `item` implements
/// `AsChangeOp<OpId=convert::OpId<usize>>`, which can be passed to `ChangeOpColumns::encode`.
///
/// Once encoding is complete you can use `ChangeActors::done` to retrieve the original actor and the
/// other actors in the change.
///
/// # Note on type parameters
///
/// The type paramters are annoying, they basically exist because we can't have generic associated
/// types, so we have to feed the concrete types of the associated types of the `AsChangeOp`
/// implementation through here. Here's what they all refer to:
///
/// * A - The type of the actor ID used in the operation IDs of the incoming changes
/// * I - The type of the iterator over the `AsChangeOp` implementation of the incoming changes
/// * O - The concrete type of the operation ID which implementas `convert::OpId`
/// * C - The concrete type (which implements `AsChangeOp`) of the incoming changes
/// * 'a - The lifetime bound for the AsChangeOp trait and it's associated types
///
/// Maybe when GATs land we can make this simpler.
pub(crate) struct ChangeActors<'a, ActorId, I, O, C> {
actor: ActorId,
other_actors: Vec<ActorId>,
index: BTreeMap<ActorId, usize>,
wrapped: I,
num_ops: usize,
_phantom: std::marker::PhantomData<(&'a O, C)>,
}
#[derive(thiserror::Error, Debug)]
#[error("actor index {0} referenced by an operation was not found in the changes")]
pub(crate) struct MissingActor(usize);
#[derive(Debug, thiserror::Error)]
#[error("pred OpIds out of order")]
pub(crate) struct PredOutOfOrder;
impl<'a, A, I, O, C> ChangeActors<'a, A, I, O, C>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'a A> + 'a,
C: AsChangeOp<'a, OpId = O> + 'a,
I: Iterator<Item = C> + Clone + 'a,
{
/// Create a new change actor mapping
///
/// # Arguments
/// * actor - the actor ID of the actor who authored this change
/// * ops - an iterator containing the operations which will be encoded into the change
///
/// # Errors
/// * If one of the ops herein contains a `pred` with ops which are not in lamport timestamp
/// order
pub(crate) fn new(actor: A, ops: I) -> Result<ChangeActors<'a, A, I, O, C>, PredOutOfOrder> {
// Change actors indices are encoded with the 0th element being the actor who authored the
// change and all other actors referenced in the chain following the author in
// lexicographic order. Here we collect all the actors referenced by operations in `ops`
let (num_ops, mut other_actors) =
ops.clone()
.try_fold((0, BTreeSet::new()), |(count, mut acc), op| {
if let convert::Key::Elem(convert::ElemId::Op(o)) = op.key() {
if o.actor() != &actor {
acc.insert(o.actor());
}
}
if !are_sorted(op.pred()) {
return Err(PredOutOfOrder);
}
for pred in op.pred() {
if pred.actor() != &actor {
acc.insert(pred.actor());
}
}
if let convert::ObjId::Op(o) = op.obj() {
if o.actor() != &actor {
acc.insert(o.actor());
}
}
Ok((count + 1, acc))
})?;
// This shouldn't be necessary but just in case
other_actors.remove(&actor);
let mut other_actors = other_actors.into_iter().cloned().collect::<Vec<_>>();
other_actors.sort();
let index = std::iter::once(actor.clone())
.chain(other_actors.clone().into_iter())
.enumerate()
.map(|(idx, actor)| (actor, idx))
.collect();
Ok(ChangeActors {
actor,
other_actors,
index,
wrapped: ops,
num_ops,
_phantom: std::marker::PhantomData,
})
}
/// Translate an OpID from the OpSet index to the change index
fn translate_opid(&self, opid: &O) -> ChangeOpId {
ChangeOpId {
actor: *self.index.get(opid.actor()).unwrap(),
counter: opid.counter(),
}
}
/// Returns a clonable iterator over the converted operations. The item of the iterator is an
/// implementation of `AsChangeOp` which uses the index of the actor of each operation into the
/// actors as encoded in a change. This is suitable for passing to `ChangeOpColumns::encode`
pub(crate) fn iter<'b>(&'b self) -> WithChangeActorsOpIter<'b, 'a, A, I, O, C> {
WithChangeActorsOpIter {
change_actors: self,
inner: self.wrapped.clone(),
}
}
pub(crate) fn done(self) -> (A, Vec<A>) {
(self.actor, self.other_actors)
}
}
/// The actual implementation of the converted iterator
pub(crate) struct WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C> {
change_actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
inner: I,
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> Clone
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
I: Clone,
{
fn clone(&self) -> Self {
Self {
change_actors: self.change_actors,
inner: self.inner.clone(),
}
}
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> Iterator
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
C: AsChangeOp<'aschangeop, OpId = O>,
O: convert::OpId<&'aschangeop A>,
I: Iterator<Item = C> + Clone,
{
type Item = WithChangeActors<'actors, 'aschangeop, A, I, O, C>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|o| WithChangeActors {
op: o,
actors: self.change_actors,
})
}
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> ExactSizeIterator
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
C: AsChangeOp<'aschangeop, OpId = O>,
O: convert::OpId<&'aschangeop A>,
I: Iterator<Item = C> + Clone,
{
fn len(&self) -> usize {
self.change_actors.num_ops
}
}
pub(crate) struct ChangeOpId {
actor: usize,
counter: u64,
}
impl convert::OpId<usize> for ChangeOpId {
fn actor(&self) -> usize {
self.actor
}
fn counter(&self) -> u64 {
self.counter
}
}
/// A struct which implements `AsChangeOp` by translating the actor IDs in the incoming operations
/// into the index into the actors in the `ChangeActors`.
pub(crate) struct WithChangeActors<'actors, 'aschangeop, A, I, O, C> {
op: C,
actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
}
impl<'actors, 'aschangeop, A, I, O, P, C> AsChangeOp<'aschangeop>
for WithChangeActors<'actors, 'aschangeop, A, I, O, C>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + ExactSizeIterator + 'aschangeop,
C: AsChangeOp<'aschangeop, PredIter = P, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
type ActorId = usize;
type OpId = ChangeOpId;
type PredIter = WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>;
fn action(&self) -> u64 {
self.op.action()
}
fn insert(&self) -> bool {
self.op.insert()
}
fn pred(&self) -> Self::PredIter {
WithChangeActorsPredIter {
wrapped: self.op.pred(),
actors: self.actors,
_phantom: std::marker::PhantomData,
}
}
fn key(&self) -> convert::Key<'aschangeop, Self::OpId> {
self.op.key().map(|o| self.actors.translate_opid(&o))
}
fn obj(&self) -> convert::ObjId<Self::OpId> {
self.op.obj().map(|o| self.actors.translate_opid(&o))
}
fn val(&self) -> std::borrow::Cow<'aschangeop, crate::ScalarValue> {
self.op.val()
}
}
pub(crate) struct WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P> {
wrapped: P,
actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
_phantom: std::marker::PhantomData<O>,
}
impl<'actors, 'aschangeop, A, I, O, C, P> ExactSizeIterator
for WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + ExactSizeIterator + 'aschangeop,
C: AsChangeOp<'aschangeop, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
fn len(&self) -> usize {
self.wrapped.len()
}
}
impl<'actors, 'aschangeop, A, I, O, C, P> Iterator
for WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + 'aschangeop,
C: AsChangeOp<'aschangeop, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
type Item = ChangeOpId;
fn next(&mut self) -> Option<Self::Item> {
self.wrapped.next().map(|o| self.actors.translate_opid(&o))
}
}
fn are_sorted<A, O, I>(mut opids: I) -> bool
where
A: PartialEq + Ord + Clone,
O: convert::OpId<A>,
I: Iterator<Item = O>,
{
if let Some(first) = opids.next() {
let mut prev = first;
for opid in opids {
if opid.counter() < prev.counter() {
return false;
}
if opid.counter() == prev.counter() && opid.actor() < prev.actor() {
return false;
}
prev = opid;
}
}
true
}