use std::{borrow::Cow, num::NonZeroU64}; use crate::{ columnar::Key as StoredKey, storage::{ change::{Unverified, Verified}, parse, Change as StoredChange, ChangeOp, Chunk, Compressed, ReadChangeOpError, }, types::{ActorId, ChangeHash, ElemId}, }; #[derive(Clone, Debug, PartialEq)] pub struct Change { stored: StoredChange<'static, Verified>, compression: CompressionState, len: usize, } impl Change { pub(crate) fn new(stored: StoredChange<'static, Verified>) -> Self { let len = stored.iter_ops().count(); Self { stored, len, compression: CompressionState::NotCompressed, } } pub(crate) fn new_from_unverified( stored: StoredChange<'static, Unverified>, compressed: Option>, ) -> Result { let mut len = 0; let stored = stored.verify_ops(|_| len += 1)?; let compression = if let Some(c) = compressed { CompressionState::Compressed(c) } else { CompressionState::NotCompressed }; Ok(Self { stored, len, compression, }) } pub fn actor_id(&self) -> &ActorId { self.stored.actor() } pub fn other_actor_ids(&self) -> &[ActorId] { self.stored.other_actors() } pub fn len(&self) -> usize { self.len } pub fn is_empty(&self) -> bool { self.len == 0 } pub fn max_op(&self) -> u64 { self.stored.start_op().get() + (self.len as u64) - 1 } pub fn start_op(&self) -> NonZeroU64 { self.stored.start_op() } pub fn message(&self) -> Option<&String> { self.stored.message().as_ref() } pub fn deps(&self) -> &[ChangeHash] { self.stored.dependencies() } pub fn hash(&self) -> ChangeHash { self.stored.hash() } pub fn seq(&self) -> u64 { self.stored.seq() } pub fn timestamp(&self) -> i64 { self.stored.timestamp() } pub fn bytes(&mut self) -> Cow<'_, [u8]> { if let CompressionState::NotCompressed = self.compression { if let Some(compressed) = self.stored.compress() { self.compression = CompressionState::Compressed(compressed); } else { self.compression = CompressionState::TooSmallToCompress; } }; match &self.compression { // SAFETY: We just checked this case above CompressionState::NotCompressed => unreachable!(), CompressionState::TooSmallToCompress => Cow::Borrowed(self.stored.bytes()), CompressionState::Compressed(c) => c.bytes(), } } pub fn raw_bytes(&self) -> &[u8] { self.stored.bytes() } pub(crate) fn iter_ops(&self) -> impl Iterator + '_ { self.stored.iter_ops() } pub fn extra_bytes(&self) -> &[u8] { self.stored.extra_bytes() } // TODO replace all uses of this with TryFrom<&[u8]> pub fn from_bytes(bytes: Vec) -> Result { Self::try_from(&bytes[..]) } pub fn decode(&self) -> crate::ExpandedChange { crate::ExpandedChange::from(self) } } #[derive(Clone, Debug, PartialEq)] enum CompressionState { /// We haven't tried to compress this change NotCompressed, /// We have compressed this change Compressed(Compressed<'static>), /// We tried to compress this change but it wasn't big enough to be worth it TooSmallToCompress, } impl AsRef> for Change { fn as_ref(&self) -> &StoredChange<'static, Verified> { &self.stored } } impl From for StoredChange<'static, Verified> { fn from(c: Change) -> Self { c.stored } } #[derive(thiserror::Error, Debug)] pub enum LoadError { #[error("unable to parse change: {0}")] Parse(Box), #[error("leftover data after parsing")] LeftoverData, #[error("wrong chunk type")] WrongChunkType, } impl<'a> TryFrom<&'a [u8]> for Change { type Error = LoadError; fn try_from(value: &'a [u8]) -> Result { let input = parse::Input::new(value); let (remaining, chunk) = Chunk::parse(input).map_err(|e| LoadError::Parse(Box::new(e)))?; if !remaining.is_empty() { return Err(LoadError::LeftoverData); } match chunk { Chunk::Change(c) => Self::new_from_unverified(c.into_owned(), None) .map_err(|e| LoadError::Parse(Box::new(e))), Chunk::CompressedChange(c, compressed) => { Self::new_from_unverified(c.into_owned(), Some(compressed.into_owned())) .map_err(|e| LoadError::Parse(Box::new(e))) } _ => Err(LoadError::WrongChunkType), } } } impl<'a> TryFrom> for Change { type Error = ReadChangeOpError; fn try_from(c: StoredChange<'a, Unverified>) -> Result { Self::new_from_unverified(c.into_owned(), None) } } impl From for Change { fn from(e: crate::ExpandedChange) -> Self { let stored = StoredChange::builder() .with_actor(e.actor_id) .with_extra_bytes(e.extra_bytes) .with_seq(e.seq) .with_dependencies(e.deps) .with_timestamp(e.time) .with_start_op(e.start_op) .with_message(e.message) .build(e.operations.iter()); match stored { Ok(c) => Change::new(c), Err(crate::storage::change::PredOutOfOrder) => { // Should never happen because we use `SortedVec` in legacy::Op::pred panic!("preds out of order"); } } } } mod convert_expanded { use std::borrow::Cow; use crate::{convert, legacy, storage::AsChangeOp, types::ActorId, ScalarValue}; impl<'a> AsChangeOp<'a> for &'a legacy::Op { type ActorId = &'a ActorId; type OpId = &'a legacy::OpId; type PredIter = std::slice::Iter<'a, legacy::OpId>; fn action(&self) -> u64 { self.action.action_index() } fn insert(&self) -> bool { self.insert } fn pred(&self) -> Self::PredIter { self.pred.iter() } fn key(&self) -> convert::Key<'a, Self::OpId> { match &self.key { legacy::Key::Map(s) => convert::Key::Prop(Cow::Borrowed(s)), legacy::Key::Seq(legacy::ElementId::Head) => { convert::Key::Elem(convert::ElemId::Head) } legacy::Key::Seq(legacy::ElementId::Id(o)) => { convert::Key::Elem(convert::ElemId::Op(o)) } } } fn obj(&self) -> convert::ObjId { match &self.obj { legacy::ObjectId::Root => convert::ObjId::Root, legacy::ObjectId::Id(o) => convert::ObjId::Op(o), } } fn val(&self) -> Cow<'a, crate::ScalarValue> { match self.primitive_value() { Some(v) => Cow::Owned(v), None => Cow::Owned(ScalarValue::Null), } } } impl<'a> convert::OpId<&'a ActorId> for &'a legacy::OpId { fn counter(&self) -> u64 { legacy::OpId::counter(self) } fn actor(&self) -> &'a ActorId { &self.1 } } } impl From<&Change> for crate::ExpandedChange { fn from(c: &Change) -> Self { let actors = std::iter::once(c.actor_id()) .chain(c.other_actor_ids().iter()) .cloned() .enumerate() .collect::>(); let operations = c .iter_ops() .map(|o| crate::legacy::Op { action: crate::types::OpType::from_index_and_value(o.action, o.val).unwrap(), insert: o.insert, key: match o.key { StoredKey::Elem(e) if e.is_head() => { crate::legacy::Key::Seq(crate::legacy::ElementId::Head) } StoredKey::Elem(ElemId(o)) => { crate::legacy::Key::Seq(crate::legacy::ElementId::Id( crate::legacy::OpId::new(o.counter(), actors.get(&o.actor()).unwrap()), )) } StoredKey::Prop(p) => crate::legacy::Key::Map(p), }, obj: if o.obj.is_root() { crate::legacy::ObjectId::Root } else { crate::legacy::ObjectId::Id(crate::legacy::OpId::new( o.obj.opid().counter(), actors.get(&o.obj.opid().actor()).unwrap(), )) }, pred: o .pred .into_iter() .map(|p| crate::legacy::OpId::new(p.counter(), actors.get(&p.actor()).unwrap())) .collect(), }) .collect::>(); crate::ExpandedChange { operations, actor_id: actors.get(&0).unwrap().clone(), hash: Some(c.hash()), time: c.timestamp(), deps: c.deps().to_vec(), seq: c.seq(), start_op: c.start_op(), extra_bytes: c.extra_bytes().to_vec(), message: c.message().cloned(), } } } #[cfg(test)] pub(crate) mod gen { use super::Change; use crate::{ op_tree::OpSetMetadata, storage::{change::ChangeBuilder, convert::op_as_actor_id}, types::{ gen::{gen_hash, gen_op}, ObjId, Op, OpId, }, ActorId, }; use proptest::prelude::*; fn gen_actor() -> impl Strategy { proptest::array::uniform32(proptest::bits::u8::ANY).prop_map(ActorId::from) } prop_compose! { fn gen_actors()(this_actor in gen_actor(), other_actors in proptest::collection::vec(gen_actor(), 0..10)) -> (ActorId, Vec) { (this_actor, other_actors) } } fn gen_ops( this_actor: ActorId, other_actors: Vec, ) -> impl Strategy, OpSetMetadata)> { let mut all_actors = vec![this_actor]; all_actors.extend(other_actors); let mut m = OpSetMetadata::from_actors(all_actors); m.props.cache("someprop".to_string()); let root_id = ObjId::root(); (0_u64..10) .prop_map(|num_ops| { (0..num_ops) .map(|counter| OpId::new(0, counter)) .collect::>() }) .prop_flat_map(move |opids| { let mut strat = Just(Vec::new()).boxed(); for opid in opids { strat = (gen_op(opid, vec![0]), strat) .prop_map(move |(op, ops)| { let mut result = Vec::with_capacity(ops.len() + 1); result.extend(ops); result.push((root_id, op)); result }) .boxed(); } strat }) .prop_map(move |ops| (ops, m.clone())) } prop_compose! { pub(crate) fn gen_change()((this_actor, other_actors) in gen_actors())( (ops, metadata) in gen_ops(this_actor.clone(), other_actors), start_op in 1_u64..200000, seq in 0_u64..200000, timestamp in 0..i64::MAX, deps in proptest::collection::vec(gen_hash(), 0..100), message in proptest::option::of("[a-z]{200}"), this_actor in Just(this_actor), ) -> Change { let ops = ops.iter().map(|(obj, op)| op_as_actor_id(obj, op, &metadata)); Change::new(ChangeBuilder::new() .with_dependencies(deps) .with_start_op(start_op.try_into().unwrap()) .with_message(message) .with_actor(this_actor) .with_seq(seq) .with_timestamp(timestamp) .build(ops.into_iter()) .unwrap()) } } }