automerge/rust/automerge/src/storage/change/change_actors.rs
Alex Good dd3c6d1303
Move rust workspace into ./rust
After some discussion with PVH I realise that the repo structure in the
last reorg was very rust-centric. In an attempt to put each language on
a level footing move the rust code and project files into ./rust
2022-10-16 19:55:51 +01:00

304 lines
10 KiB
Rust

use std::collections::{BTreeMap, BTreeSet};
use crate::convert;
use super::AsChangeOp;
/// This struct represents the ordering of actor indices in a change chunk. Operations in a change
/// chunk are encoded with the actor ID represented as an offset into an array of actors which are
/// encoded at the start of the chunk. This array is in a specific order: the author of the change
/// is always the first actor, then all other actors referenced in a change are encoded in
/// lexicographic order.
///
/// The intended usage is to construct a `ChangeActors` from an iterator over `AsChangeOp` where
/// the `ActorId` of the `AsChangeOp` implementation is the original actor ID. The resulting
/// `ChangeActors` implements `Iterator` where the `item` implements
/// `AsChangeOp<OpId=convert::OpId<usize>>`, which can be passed to `ChangeOpColumns::encode`.
///
/// Once encoding is complete you can use `ChangeActors::done` to retrieve the original actor and the
/// other actors in the change.
///
/// # Note on type parameters
///
/// The type paramters are annoying, they basically exist because we can't have generic associated
/// types, so we have to feed the concrete types of the associated types of the `AsChangeOp`
/// implementation through here. Here's what they all refer to:
///
/// * A - The type of the actor ID used in the operation IDs of the incoming changes
/// * I - The type of the iterator over the `AsChangeOp` implementation of the incoming changes
/// * O - The concrete type of the operation ID which implementas `convert::OpId`
/// * C - The concrete type (which implements `AsChangeOp`) of the incoming changes
/// * 'a - The lifetime bound for the AsChangeOp trait and it's associated types
///
/// Maybe when GATs land we can make this simpler.
pub(crate) struct ChangeActors<'a, ActorId, I, O, C> {
actor: ActorId,
other_actors: Vec<ActorId>,
index: BTreeMap<ActorId, usize>,
wrapped: I,
num_ops: usize,
_phantom: std::marker::PhantomData<(&'a O, C)>,
}
#[derive(thiserror::Error, Debug)]
#[error("actor index {0} referenced by an operation was not found in the changes")]
pub(crate) struct MissingActor(usize);
#[derive(Debug, thiserror::Error)]
#[error("pred OpIds out of order")]
pub(crate) struct PredOutOfOrder;
impl<'a, A, I, O, C> ChangeActors<'a, A, I, O, C>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'a A> + 'a,
C: AsChangeOp<'a, OpId = O> + 'a,
I: Iterator<Item = C> + Clone + 'a,
{
/// Create a new change actor mapping
///
/// # Arguments
/// * actor - the actor ID of the actor who authored this change
/// * ops - an iterator containing the operations which will be encoded into the change
///
/// # Errors
/// * If one of the ops herein contains a `pred` with ops which are not in lamport timestamp
/// order
pub(crate) fn new(actor: A, ops: I) -> Result<ChangeActors<'a, A, I, O, C>, PredOutOfOrder> {
// Change actors indices are encoded with the 0th element being the actor who authored the
// change and all other actors referenced in the chain following the author in
// lexicographic order. Here we collect all the actors referenced by operations in `ops`
let (num_ops, mut other_actors) =
ops.clone()
.try_fold((0, BTreeSet::new()), |(count, mut acc), op| {
if let convert::Key::Elem(convert::ElemId::Op(o)) = op.key() {
if o.actor() != &actor {
acc.insert(o.actor());
}
}
if !are_sorted(op.pred()) {
return Err(PredOutOfOrder);
}
for pred in op.pred() {
if pred.actor() != &actor {
acc.insert(pred.actor());
}
}
if let convert::ObjId::Op(o) = op.obj() {
if o.actor() != &actor {
acc.insert(o.actor());
}
}
Ok((count + 1, acc))
})?;
// This shouldn't be necessary but just in case
other_actors.remove(&actor);
let mut other_actors = other_actors.into_iter().cloned().collect::<Vec<_>>();
other_actors.sort();
let index = std::iter::once(actor.clone())
.chain(other_actors.clone().into_iter())
.enumerate()
.map(|(idx, actor)| (actor, idx))
.collect();
Ok(ChangeActors {
actor,
other_actors,
index,
wrapped: ops,
num_ops,
_phantom: std::marker::PhantomData,
})
}
/// Translate an OpID from the OpSet index to the change index
fn translate_opid(&self, opid: &O) -> ChangeOpId {
ChangeOpId {
actor: *self.index.get(opid.actor()).unwrap(),
counter: opid.counter(),
}
}
/// Returns a clonable iterator over the converted operations. The item of the iterator is an
/// implementation of `AsChangeOp` which uses the index of the actor of each operation into the
/// actors as encoded in a change. This is suitable for passing to `ChangeOpColumns::encode`
pub(crate) fn iter<'b>(&'b self) -> WithChangeActorsOpIter<'b, 'a, A, I, O, C> {
WithChangeActorsOpIter {
change_actors: self,
inner: self.wrapped.clone(),
}
}
pub(crate) fn done(self) -> (A, Vec<A>) {
(self.actor, self.other_actors)
}
}
/// The actual implementation of the converted iterator
pub(crate) struct WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C> {
change_actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
inner: I,
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> Clone
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
I: Clone,
{
fn clone(&self) -> Self {
Self {
change_actors: self.change_actors,
inner: self.inner.clone(),
}
}
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> Iterator
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
C: AsChangeOp<'aschangeop, OpId = O>,
O: convert::OpId<&'aschangeop A>,
I: Iterator<Item = C> + Clone,
{
type Item = WithChangeActors<'actors, 'aschangeop, A, I, O, C>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|o| WithChangeActors {
op: o,
actors: self.change_actors,
})
}
}
impl<'actors, 'aschangeop, A: 'aschangeop, I, O, C> ExactSizeIterator
for WithChangeActorsOpIter<'actors, 'aschangeop, A, I, O, C>
where
C: AsChangeOp<'aschangeop, OpId = O>,
O: convert::OpId<&'aschangeop A>,
I: Iterator<Item = C> + Clone,
{
fn len(&self) -> usize {
self.change_actors.num_ops
}
}
pub(crate) struct ChangeOpId {
actor: usize,
counter: u64,
}
impl convert::OpId<usize> for ChangeOpId {
fn actor(&self) -> usize {
self.actor
}
fn counter(&self) -> u64 {
self.counter
}
}
/// A struct which implements `AsChangeOp` by translating the actor IDs in the incoming operations
/// into the index into the actors in the `ChangeActors`.
pub(crate) struct WithChangeActors<'actors, 'aschangeop, A, I, O, C> {
op: C,
actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
}
impl<'actors, 'aschangeop, A, I, O, P, C> AsChangeOp<'aschangeop>
for WithChangeActors<'actors, 'aschangeop, A, I, O, C>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + ExactSizeIterator + 'aschangeop,
C: AsChangeOp<'aschangeop, PredIter = P, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
type ActorId = usize;
type OpId = ChangeOpId;
type PredIter = WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>;
fn action(&self) -> u64 {
self.op.action()
}
fn insert(&self) -> bool {
self.op.insert()
}
fn pred(&self) -> Self::PredIter {
WithChangeActorsPredIter {
wrapped: self.op.pred(),
actors: self.actors,
_phantom: std::marker::PhantomData,
}
}
fn key(&self) -> convert::Key<'aschangeop, Self::OpId> {
self.op.key().map(|o| self.actors.translate_opid(&o))
}
fn obj(&self) -> convert::ObjId<Self::OpId> {
self.op.obj().map(|o| self.actors.translate_opid(&o))
}
fn val(&self) -> std::borrow::Cow<'aschangeop, crate::ScalarValue> {
self.op.val()
}
}
pub(crate) struct WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P> {
wrapped: P,
actors: &'actors ChangeActors<'aschangeop, A, I, O, C>,
_phantom: std::marker::PhantomData<O>,
}
impl<'actors, 'aschangeop, A, I, O, C, P> ExactSizeIterator
for WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + ExactSizeIterator + 'aschangeop,
C: AsChangeOp<'aschangeop, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
fn len(&self) -> usize {
self.wrapped.len()
}
}
impl<'actors, 'aschangeop, A, I, O, C, P> Iterator
for WithChangeActorsPredIter<'actors, 'aschangeop, A, I, O, C, P>
where
A: PartialEq + Ord + Clone + std::hash::Hash + 'static,
O: convert::OpId<&'aschangeop A>,
P: Iterator<Item = O> + 'aschangeop,
C: AsChangeOp<'aschangeop, OpId = O> + 'aschangeop,
I: Iterator<Item = C> + Clone + 'aschangeop,
{
type Item = ChangeOpId;
fn next(&mut self) -> Option<Self::Item> {
self.wrapped.next().map(|o| self.actors.translate_opid(&o))
}
}
fn are_sorted<A, O, I>(mut opids: I) -> bool
where
A: PartialEq + Ord + Clone,
O: convert::OpId<A>,
I: Iterator<Item = O>,
{
if let Some(first) = opids.next() {
let mut prev = first;
for opid in opids {
if opid.counter() < prev.counter() {
return false;
}
if opid.counter() == prev.counter() && opid.actor() < prev.actor() {
return false;
}
prev = opid;
}
}
true
}