After some discussion with PVH I realise that the repo structure in the last reorg was very rust-centric. In an attempt to put each language on a level footing move the rust code and project files into ./rust
		
			
				
	
	
		
			402 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
use std::{borrow::Cow, num::NonZeroU64};
 | 
						|
 | 
						|
use crate::{
 | 
						|
    columnar::Key as StoredKey,
 | 
						|
    storage::{
 | 
						|
        change::{Unverified, Verified},
 | 
						|
        parse, Change as StoredChange, ChangeOp, Chunk, Compressed, ReadChangeOpError,
 | 
						|
    },
 | 
						|
    types::{ActorId, ChangeHash, ElemId},
 | 
						|
};
 | 
						|
 | 
						|
#[derive(Clone, Debug, PartialEq)]
 | 
						|
pub struct Change {
 | 
						|
    stored: StoredChange<'static, Verified>,
 | 
						|
    compression: CompressionState,
 | 
						|
    len: usize,
 | 
						|
}
 | 
						|
 | 
						|
impl Change {
 | 
						|
    pub(crate) fn new(stored: StoredChange<'static, Verified>) -> Self {
 | 
						|
        let len = stored.iter_ops().count();
 | 
						|
        Self {
 | 
						|
            stored,
 | 
						|
            len,
 | 
						|
            compression: CompressionState::NotCompressed,
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    pub(crate) fn new_from_unverified(
 | 
						|
        stored: StoredChange<'static, Unverified>,
 | 
						|
        compressed: Option<Compressed<'static>>,
 | 
						|
    ) -> Result<Self, ReadChangeOpError> {
 | 
						|
        let mut len = 0;
 | 
						|
        let stored = stored.verify_ops(|_| len += 1)?;
 | 
						|
        let compression = if let Some(c) = compressed {
 | 
						|
            CompressionState::Compressed(c)
 | 
						|
        } else {
 | 
						|
            CompressionState::NotCompressed
 | 
						|
        };
 | 
						|
        Ok(Self {
 | 
						|
            stored,
 | 
						|
            len,
 | 
						|
            compression,
 | 
						|
        })
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn actor_id(&self) -> &ActorId {
 | 
						|
        self.stored.actor()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn other_actor_ids(&self) -> &[ActorId] {
 | 
						|
        self.stored.other_actors()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn len(&self) -> usize {
 | 
						|
        self.len
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn is_empty(&self) -> bool {
 | 
						|
        self.len == 0
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn max_op(&self) -> u64 {
 | 
						|
        self.stored.start_op().get() + (self.len as u64) - 1
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn start_op(&self) -> NonZeroU64 {
 | 
						|
        self.stored.start_op()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn message(&self) -> Option<&String> {
 | 
						|
        self.stored.message().as_ref()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn deps(&self) -> &[ChangeHash] {
 | 
						|
        self.stored.dependencies()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn hash(&self) -> ChangeHash {
 | 
						|
        self.stored.hash()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn seq(&self) -> u64 {
 | 
						|
        self.stored.seq()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn timestamp(&self) -> i64 {
 | 
						|
        self.stored.timestamp()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn bytes(&mut self) -> Cow<'_, [u8]> {
 | 
						|
        if let CompressionState::NotCompressed = self.compression {
 | 
						|
            if let Some(compressed) = self.stored.compress() {
 | 
						|
                self.compression = CompressionState::Compressed(compressed);
 | 
						|
            } else {
 | 
						|
                self.compression = CompressionState::TooSmallToCompress;
 | 
						|
            }
 | 
						|
        };
 | 
						|
        match &self.compression {
 | 
						|
            // SAFETY: We just checked this case above
 | 
						|
            CompressionState::NotCompressed => unreachable!(),
 | 
						|
            CompressionState::TooSmallToCompress => Cow::Borrowed(self.stored.bytes()),
 | 
						|
            CompressionState::Compressed(c) => c.bytes(),
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn raw_bytes(&self) -> &[u8] {
 | 
						|
        self.stored.bytes()
 | 
						|
    }
 | 
						|
 | 
						|
    pub(crate) fn iter_ops(&self) -> impl Iterator<Item = ChangeOp> + '_ {
 | 
						|
        self.stored.iter_ops()
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn extra_bytes(&self) -> &[u8] {
 | 
						|
        self.stored.extra_bytes()
 | 
						|
    }
 | 
						|
 | 
						|
    // TODO replace all uses of this with TryFrom<&[u8]>
 | 
						|
    pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, LoadError> {
 | 
						|
        Self::try_from(&bytes[..])
 | 
						|
    }
 | 
						|
 | 
						|
    pub fn decode(&self) -> crate::ExpandedChange {
 | 
						|
        crate::ExpandedChange::from(self)
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[derive(Clone, Debug, PartialEq)]
 | 
						|
enum CompressionState {
 | 
						|
    /// We haven't tried to compress this change
 | 
						|
    NotCompressed,
 | 
						|
    /// We have compressed this change
 | 
						|
    Compressed(Compressed<'static>),
 | 
						|
    /// We tried to compress this change but it wasn't big enough to be worth it
 | 
						|
    TooSmallToCompress,
 | 
						|
}
 | 
						|
 | 
						|
impl AsRef<StoredChange<'static, Verified>> for Change {
 | 
						|
    fn as_ref(&self) -> &StoredChange<'static, Verified> {
 | 
						|
        &self.stored
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl From<Change> for StoredChange<'static, Verified> {
 | 
						|
    fn from(c: Change) -> Self {
 | 
						|
        c.stored
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[derive(thiserror::Error, Debug)]
 | 
						|
pub enum LoadError {
 | 
						|
    #[error("unable to parse change: {0}")]
 | 
						|
    Parse(Box<dyn std::error::Error + Send + Sync + 'static>),
 | 
						|
    #[error("leftover data after parsing")]
 | 
						|
    LeftoverData,
 | 
						|
    #[error("wrong chunk type")]
 | 
						|
    WrongChunkType,
 | 
						|
}
 | 
						|
 | 
						|
impl<'a> TryFrom<&'a [u8]> for Change {
 | 
						|
    type Error = LoadError;
 | 
						|
 | 
						|
    fn try_from(value: &'a [u8]) -> Result<Self, Self::Error> {
 | 
						|
        let input = parse::Input::new(value);
 | 
						|
        let (remaining, chunk) = Chunk::parse(input).map_err(|e| LoadError::Parse(Box::new(e)))?;
 | 
						|
        if !remaining.is_empty() {
 | 
						|
            return Err(LoadError::LeftoverData);
 | 
						|
        }
 | 
						|
        match chunk {
 | 
						|
            Chunk::Change(c) => Self::new_from_unverified(c.into_owned(), None)
 | 
						|
                .map_err(|e| LoadError::Parse(Box::new(e))),
 | 
						|
            Chunk::CompressedChange(c, compressed) => {
 | 
						|
                Self::new_from_unverified(c.into_owned(), Some(compressed.into_owned()))
 | 
						|
                    .map_err(|e| LoadError::Parse(Box::new(e)))
 | 
						|
            }
 | 
						|
            _ => Err(LoadError::WrongChunkType),
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl<'a> TryFrom<StoredChange<'a, Unverified>> for Change {
 | 
						|
    type Error = ReadChangeOpError;
 | 
						|
 | 
						|
    fn try_from(c: StoredChange<'a, Unverified>) -> Result<Self, Self::Error> {
 | 
						|
        Self::new_from_unverified(c.into_owned(), None)
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl From<crate::ExpandedChange> for Change {
 | 
						|
    fn from(e: crate::ExpandedChange) -> Self {
 | 
						|
        let stored = StoredChange::builder()
 | 
						|
            .with_actor(e.actor_id)
 | 
						|
            .with_extra_bytes(e.extra_bytes)
 | 
						|
            .with_seq(e.seq)
 | 
						|
            .with_dependencies(e.deps)
 | 
						|
            .with_timestamp(e.time)
 | 
						|
            .with_start_op(e.start_op)
 | 
						|
            .with_message(e.message)
 | 
						|
            .build(e.operations.iter());
 | 
						|
        match stored {
 | 
						|
            Ok(c) => Change::new(c),
 | 
						|
            Err(crate::storage::change::PredOutOfOrder) => {
 | 
						|
                // Should never happen because we use `SortedVec` in legacy::Op::pred
 | 
						|
                panic!("preds out of order");
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
mod convert_expanded {
 | 
						|
    use std::borrow::Cow;
 | 
						|
 | 
						|
    use crate::{convert, legacy, storage::AsChangeOp, types::ActorId, ScalarValue};
 | 
						|
 | 
						|
    impl<'a> AsChangeOp<'a> for &'a legacy::Op {
 | 
						|
        type ActorId = &'a ActorId;
 | 
						|
        type OpId = &'a legacy::OpId;
 | 
						|
        type PredIter = std::slice::Iter<'a, legacy::OpId>;
 | 
						|
 | 
						|
        fn action(&self) -> u64 {
 | 
						|
            self.action.action_index()
 | 
						|
        }
 | 
						|
 | 
						|
        fn insert(&self) -> bool {
 | 
						|
            self.insert
 | 
						|
        }
 | 
						|
 | 
						|
        fn pred(&self) -> Self::PredIter {
 | 
						|
            self.pred.iter()
 | 
						|
        }
 | 
						|
 | 
						|
        fn key(&self) -> convert::Key<'a, Self::OpId> {
 | 
						|
            match &self.key {
 | 
						|
                legacy::Key::Map(s) => convert::Key::Prop(Cow::Borrowed(s)),
 | 
						|
                legacy::Key::Seq(legacy::ElementId::Head) => {
 | 
						|
                    convert::Key::Elem(convert::ElemId::Head)
 | 
						|
                }
 | 
						|
                legacy::Key::Seq(legacy::ElementId::Id(o)) => {
 | 
						|
                    convert::Key::Elem(convert::ElemId::Op(o))
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        fn obj(&self) -> convert::ObjId<Self::OpId> {
 | 
						|
            match &self.obj {
 | 
						|
                legacy::ObjectId::Root => convert::ObjId::Root,
 | 
						|
                legacy::ObjectId::Id(o) => convert::ObjId::Op(o),
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        fn val(&self) -> Cow<'a, crate::ScalarValue> {
 | 
						|
            match self.primitive_value() {
 | 
						|
                Some(v) => Cow::Owned(v),
 | 
						|
                None => Cow::Owned(ScalarValue::Null),
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    impl<'a> convert::OpId<&'a ActorId> for &'a legacy::OpId {
 | 
						|
        fn counter(&self) -> u64 {
 | 
						|
            legacy::OpId::counter(self)
 | 
						|
        }
 | 
						|
 | 
						|
        fn actor(&self) -> &'a ActorId {
 | 
						|
            &self.1
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl From<&Change> for crate::ExpandedChange {
 | 
						|
    fn from(c: &Change) -> Self {
 | 
						|
        let actors = std::iter::once(c.actor_id())
 | 
						|
            .chain(c.other_actor_ids().iter())
 | 
						|
            .cloned()
 | 
						|
            .enumerate()
 | 
						|
            .collect::<std::collections::HashMap<_, _>>();
 | 
						|
        let operations = c
 | 
						|
            .iter_ops()
 | 
						|
            .map(|o| crate::legacy::Op {
 | 
						|
                action: crate::types::OpType::from_index_and_value(o.action, o.val).unwrap(),
 | 
						|
                insert: o.insert,
 | 
						|
                key: match o.key {
 | 
						|
                    StoredKey::Elem(e) if e.is_head() => {
 | 
						|
                        crate::legacy::Key::Seq(crate::legacy::ElementId::Head)
 | 
						|
                    }
 | 
						|
                    StoredKey::Elem(ElemId(o)) => {
 | 
						|
                        crate::legacy::Key::Seq(crate::legacy::ElementId::Id(
 | 
						|
                            crate::legacy::OpId::new(o.counter(), actors.get(&o.actor()).unwrap()),
 | 
						|
                        ))
 | 
						|
                    }
 | 
						|
                    StoredKey::Prop(p) => crate::legacy::Key::Map(p),
 | 
						|
                },
 | 
						|
                obj: if o.obj.is_root() {
 | 
						|
                    crate::legacy::ObjectId::Root
 | 
						|
                } else {
 | 
						|
                    crate::legacy::ObjectId::Id(crate::legacy::OpId::new(
 | 
						|
                        o.obj.opid().counter(),
 | 
						|
                        actors.get(&o.obj.opid().actor()).unwrap(),
 | 
						|
                    ))
 | 
						|
                },
 | 
						|
                pred: o
 | 
						|
                    .pred
 | 
						|
                    .into_iter()
 | 
						|
                    .map(|p| crate::legacy::OpId::new(p.counter(), actors.get(&p.actor()).unwrap()))
 | 
						|
                    .collect(),
 | 
						|
            })
 | 
						|
            .collect::<Vec<_>>();
 | 
						|
        crate::ExpandedChange {
 | 
						|
            operations,
 | 
						|
            actor_id: actors.get(&0).unwrap().clone(),
 | 
						|
            hash: Some(c.hash()),
 | 
						|
            time: c.timestamp(),
 | 
						|
            deps: c.deps().to_vec(),
 | 
						|
            seq: c.seq(),
 | 
						|
            start_op: c.start_op(),
 | 
						|
            extra_bytes: c.extra_bytes().to_vec(),
 | 
						|
            message: c.message().cloned(),
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[cfg(test)]
 | 
						|
pub(crate) mod gen {
 | 
						|
    use super::Change;
 | 
						|
    use crate::{
 | 
						|
        op_tree::OpSetMetadata,
 | 
						|
        storage::{change::ChangeBuilder, convert::op_as_actor_id},
 | 
						|
        types::{
 | 
						|
            gen::{gen_hash, gen_op},
 | 
						|
            ObjId, Op, OpId,
 | 
						|
        },
 | 
						|
        ActorId,
 | 
						|
    };
 | 
						|
    use proptest::prelude::*;
 | 
						|
 | 
						|
    fn gen_actor() -> impl Strategy<Value = ActorId> {
 | 
						|
        proptest::array::uniform32(proptest::bits::u8::ANY).prop_map(ActorId::from)
 | 
						|
    }
 | 
						|
 | 
						|
    prop_compose! {
 | 
						|
        fn gen_actors()(this_actor in gen_actor(), other_actors in proptest::collection::vec(gen_actor(), 0..10)) -> (ActorId, Vec<ActorId>) {
 | 
						|
            (this_actor, other_actors)
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    fn gen_ops(
 | 
						|
        this_actor: ActorId,
 | 
						|
        other_actors: Vec<ActorId>,
 | 
						|
    ) -> impl Strategy<Value = (Vec<(ObjId, Op)>, OpSetMetadata)> {
 | 
						|
        let mut all_actors = vec![this_actor];
 | 
						|
        all_actors.extend(other_actors);
 | 
						|
        let mut m = OpSetMetadata::from_actors(all_actors);
 | 
						|
        m.props.cache("someprop".to_string());
 | 
						|
        let root_id = ObjId::root();
 | 
						|
        (0_u64..10)
 | 
						|
            .prop_map(|num_ops| {
 | 
						|
                (0..num_ops)
 | 
						|
                    .map(|counter| OpId::new(0, counter))
 | 
						|
                    .collect::<Vec<_>>()
 | 
						|
            })
 | 
						|
            .prop_flat_map(move |opids| {
 | 
						|
                let mut strat = Just(Vec::new()).boxed();
 | 
						|
                for opid in opids {
 | 
						|
                    strat = (gen_op(opid, vec![0]), strat)
 | 
						|
                        .prop_map(move |(op, ops)| {
 | 
						|
                            let mut result = Vec::with_capacity(ops.len() + 1);
 | 
						|
                            result.extend(ops);
 | 
						|
                            result.push((root_id, op));
 | 
						|
                            result
 | 
						|
                        })
 | 
						|
                        .boxed();
 | 
						|
                }
 | 
						|
                strat
 | 
						|
            })
 | 
						|
            .prop_map(move |ops| (ops, m.clone()))
 | 
						|
    }
 | 
						|
 | 
						|
    prop_compose! {
 | 
						|
        pub(crate) fn gen_change()((this_actor, other_actors) in gen_actors())(
 | 
						|
                (ops, metadata) in gen_ops(this_actor.clone(), other_actors),
 | 
						|
                start_op in 1_u64..200000,
 | 
						|
                seq in 0_u64..200000,
 | 
						|
                timestamp in 0..i64::MAX,
 | 
						|
                deps in proptest::collection::vec(gen_hash(), 0..100),
 | 
						|
                message in proptest::option::of("[a-z]{200}"),
 | 
						|
                this_actor in Just(this_actor),
 | 
						|
            ) -> Change {
 | 
						|
            let ops = ops.iter().map(|(obj, op)| op_as_actor_id(obj, op, &metadata));
 | 
						|
            Change::new(ChangeBuilder::new()
 | 
						|
                .with_dependencies(deps)
 | 
						|
                .with_start_op(start_op.try_into().unwrap())
 | 
						|
                .with_message(message)
 | 
						|
                .with_actor(this_actor)
 | 
						|
                .with_seq(seq)
 | 
						|
                .with_timestamp(timestamp)
 | 
						|
                .build(ops.into_iter())
 | 
						|
                .unwrap())
 | 
						|
        }
 | 
						|
 | 
						|
    }
 | 
						|
}
 |