automerge/rust/automerge/src/change.rs

402 lines
12 KiB
Rust

use std::{borrow::Cow, num::NonZeroU64};
use crate::{
columnar::Key as StoredKey,
storage::{
change::{Unverified, Verified},
parse, Change as StoredChange, ChangeOp, Chunk, Compressed, ReadChangeOpError,
},
types::{ActorId, ChangeHash, ElemId},
};
#[derive(Clone, Debug, PartialEq)]
pub struct Change {
stored: StoredChange<'static, Verified>,
compression: CompressionState,
len: usize,
}
impl Change {
pub(crate) fn new(stored: StoredChange<'static, Verified>) -> Self {
let len = stored.iter_ops().count();
Self {
stored,
len,
compression: CompressionState::NotCompressed,
}
}
pub(crate) fn new_from_unverified(
stored: StoredChange<'static, Unverified>,
compressed: Option<Compressed<'static>>,
) -> Result<Self, ReadChangeOpError> {
let mut len = 0;
let stored = stored.verify_ops(|_| len += 1)?;
let compression = if let Some(c) = compressed {
CompressionState::Compressed(c)
} else {
CompressionState::NotCompressed
};
Ok(Self {
stored,
len,
compression,
})
}
pub fn actor_id(&self) -> &ActorId {
self.stored.actor()
}
pub fn other_actor_ids(&self) -> &[ActorId] {
self.stored.other_actors()
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn max_op(&self) -> u64 {
self.stored.start_op().get() + (self.len as u64) - 1
}
pub fn start_op(&self) -> NonZeroU64 {
self.stored.start_op()
}
pub fn message(&self) -> Option<&String> {
self.stored.message().as_ref()
}
pub fn deps(&self) -> &[ChangeHash] {
self.stored.dependencies()
}
pub fn hash(&self) -> ChangeHash {
self.stored.hash()
}
pub fn seq(&self) -> u64 {
self.stored.seq()
}
pub fn timestamp(&self) -> i64 {
self.stored.timestamp()
}
pub fn bytes(&mut self) -> Cow<'_, [u8]> {
if let CompressionState::NotCompressed = self.compression {
if let Some(compressed) = self.stored.compress() {
self.compression = CompressionState::Compressed(compressed);
} else {
self.compression = CompressionState::TooSmallToCompress;
}
};
match &self.compression {
// SAFETY: We just checked this case above
CompressionState::NotCompressed => unreachable!(),
CompressionState::TooSmallToCompress => Cow::Borrowed(self.stored.bytes()),
CompressionState::Compressed(c) => c.bytes(),
}
}
pub fn raw_bytes(&self) -> &[u8] {
self.stored.bytes()
}
pub(crate) fn iter_ops(&self) -> impl Iterator<Item = ChangeOp> + '_ {
self.stored.iter_ops()
}
pub fn extra_bytes(&self) -> &[u8] {
self.stored.extra_bytes()
}
// TODO replace all uses of this with TryFrom<&[u8]>
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, LoadError> {
Self::try_from(&bytes[..])
}
pub fn decode(&self) -> crate::ExpandedChange {
crate::ExpandedChange::from(self)
}
}
#[derive(Clone, Debug, PartialEq)]
enum CompressionState {
/// We haven't tried to compress this change
NotCompressed,
/// We have compressed this change
Compressed(Compressed<'static>),
/// We tried to compress this change but it wasn't big enough to be worth it
TooSmallToCompress,
}
impl AsRef<StoredChange<'static, Verified>> for Change {
fn as_ref(&self) -> &StoredChange<'static, Verified> {
&self.stored
}
}
impl From<Change> for StoredChange<'static, Verified> {
fn from(c: Change) -> Self {
c.stored
}
}
#[derive(thiserror::Error, Debug)]
pub enum LoadError {
#[error("unable to parse change: {0}")]
Parse(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("leftover data after parsing")]
LeftoverData,
#[error("wrong chunk type")]
WrongChunkType,
}
impl<'a> TryFrom<&'a [u8]> for Change {
type Error = LoadError;
fn try_from(value: &'a [u8]) -> Result<Self, Self::Error> {
let input = parse::Input::new(value);
let (remaining, chunk) = Chunk::parse(input).map_err(|e| LoadError::Parse(Box::new(e)))?;
if !remaining.is_empty() {
return Err(LoadError::LeftoverData);
}
match chunk {
Chunk::Change(c) => Self::new_from_unverified(c.into_owned(), None)
.map_err(|e| LoadError::Parse(Box::new(e))),
Chunk::CompressedChange(c, compressed) => {
Self::new_from_unverified(c.into_owned(), Some(compressed.into_owned()))
.map_err(|e| LoadError::Parse(Box::new(e)))
}
_ => Err(LoadError::WrongChunkType),
}
}
}
impl<'a> TryFrom<StoredChange<'a, Unverified>> for Change {
type Error = ReadChangeOpError;
fn try_from(c: StoredChange<'a, Unverified>) -> Result<Self, Self::Error> {
Self::new_from_unverified(c.into_owned(), None)
}
}
impl From<crate::ExpandedChange> for Change {
fn from(e: crate::ExpandedChange) -> Self {
let stored = StoredChange::builder()
.with_actor(e.actor_id)
.with_extra_bytes(e.extra_bytes)
.with_seq(e.seq)
.with_dependencies(e.deps)
.with_timestamp(e.time)
.with_start_op(e.start_op)
.with_message(e.message)
.build(e.operations.iter());
match stored {
Ok(c) => Change::new(c),
Err(crate::storage::change::PredOutOfOrder) => {
// Should never happen because we use `SortedVec` in legacy::Op::pred
panic!("preds out of order");
}
}
}
}
mod convert_expanded {
use std::borrow::Cow;
use crate::{convert, legacy, storage::AsChangeOp, types::ActorId, ScalarValue};
impl<'a> AsChangeOp<'a> for &'a legacy::Op {
type ActorId = &'a ActorId;
type OpId = &'a legacy::OpId;
type PredIter = std::slice::Iter<'a, legacy::OpId>;
fn action(&self) -> u64 {
self.action.action_index()
}
fn insert(&self) -> bool {
self.insert
}
fn pred(&self) -> Self::PredIter {
self.pred.iter()
}
fn key(&self) -> convert::Key<'a, Self::OpId> {
match &self.key {
legacy::Key::Map(s) => convert::Key::Prop(Cow::Borrowed(s)),
legacy::Key::Seq(legacy::ElementId::Head) => {
convert::Key::Elem(convert::ElemId::Head)
}
legacy::Key::Seq(legacy::ElementId::Id(o)) => {
convert::Key::Elem(convert::ElemId::Op(o))
}
}
}
fn obj(&self) -> convert::ObjId<Self::OpId> {
match &self.obj {
legacy::ObjectId::Root => convert::ObjId::Root,
legacy::ObjectId::Id(o) => convert::ObjId::Op(o),
}
}
fn val(&self) -> Cow<'a, crate::ScalarValue> {
match self.primitive_value() {
Some(v) => Cow::Owned(v),
None => Cow::Owned(ScalarValue::Null),
}
}
}
impl<'a> convert::OpId<&'a ActorId> for &'a legacy::OpId {
fn counter(&self) -> u64 {
legacy::OpId::counter(self)
}
fn actor(&self) -> &'a ActorId {
&self.1
}
}
}
impl From<&Change> for crate::ExpandedChange {
fn from(c: &Change) -> Self {
let actors = std::iter::once(c.actor_id())
.chain(c.other_actor_ids().iter())
.cloned()
.enumerate()
.collect::<std::collections::HashMap<_, _>>();
let operations = c
.iter_ops()
.map(|o| crate::legacy::Op {
action: crate::types::OpType::from_index_and_value(o.action, o.val).unwrap(),
insert: o.insert,
key: match o.key {
StoredKey::Elem(e) if e.is_head() => {
crate::legacy::Key::Seq(crate::legacy::ElementId::Head)
}
StoredKey::Elem(ElemId(o)) => {
crate::legacy::Key::Seq(crate::legacy::ElementId::Id(
crate::legacy::OpId::new(o.counter(), actors.get(&o.actor()).unwrap()),
))
}
StoredKey::Prop(p) => crate::legacy::Key::Map(p),
},
obj: if o.obj.is_root() {
crate::legacy::ObjectId::Root
} else {
crate::legacy::ObjectId::Id(crate::legacy::OpId::new(
o.obj.opid().counter(),
actors.get(&o.obj.opid().actor()).unwrap(),
))
},
pred: o
.pred
.into_iter()
.map(|p| crate::legacy::OpId::new(p.counter(), actors.get(&p.actor()).unwrap()))
.collect(),
})
.collect::<Vec<_>>();
crate::ExpandedChange {
operations,
actor_id: actors.get(&0).unwrap().clone(),
hash: Some(c.hash()),
time: c.timestamp(),
deps: c.deps().to_vec(),
seq: c.seq(),
start_op: c.start_op(),
extra_bytes: c.extra_bytes().to_vec(),
message: c.message().cloned(),
}
}
}
#[cfg(test)]
pub(crate) mod gen {
use super::Change;
use crate::{
op_tree::OpSetMetadata,
storage::{change::ChangeBuilder, convert::op_as_actor_id},
types::{
gen::{gen_hash, gen_op},
ObjId, Op, OpId,
},
ActorId,
};
use proptest::prelude::*;
fn gen_actor() -> impl Strategy<Value = ActorId> {
proptest::array::uniform32(proptest::bits::u8::ANY).prop_map(ActorId::from)
}
prop_compose! {
fn gen_actors()(this_actor in gen_actor(), other_actors in proptest::collection::vec(gen_actor(), 0..10)) -> (ActorId, Vec<ActorId>) {
(this_actor, other_actors)
}
}
fn gen_ops(
this_actor: ActorId,
other_actors: Vec<ActorId>,
) -> impl Strategy<Value = (Vec<(ObjId, Op)>, OpSetMetadata)> {
let mut all_actors = vec![this_actor];
all_actors.extend(other_actors);
let mut m = OpSetMetadata::from_actors(all_actors);
m.props.cache("someprop".to_string());
let root_id = ObjId::root();
(0_u64..10)
.prop_map(|num_ops| {
(0..num_ops)
.map(|counter| OpId::new(counter, 0))
.collect::<Vec<_>>()
})
.prop_flat_map(move |opids| {
let mut strat = Just(Vec::new()).boxed();
for opid in opids {
strat = (gen_op(opid, vec![0]), strat)
.prop_map(move |(op, ops)| {
let mut result = Vec::with_capacity(ops.len() + 1);
result.extend(ops);
result.push((root_id, op));
result
})
.boxed();
}
strat
})
.prop_map(move |ops| (ops, m.clone()))
}
prop_compose! {
pub(crate) fn gen_change()((this_actor, other_actors) in gen_actors())(
(ops, metadata) in gen_ops(this_actor.clone(), other_actors),
start_op in 1_u64..200000,
seq in 0_u64..200000,
timestamp in 0..i64::MAX,
deps in proptest::collection::vec(gen_hash(), 0..100),
message in proptest::option::of("[a-z]{200}"),
this_actor in Just(this_actor),
) -> Change {
let ops = ops.iter().map(|(obj, op)| op_as_actor_id(obj, op, &metadata));
Change::new(ChangeBuilder::new()
.with_dependencies(deps)
.with_start_op(start_op.try_into().unwrap())
.with_message(message)
.with_actor(this_actor)
.with_seq(seq)
.with_timestamp(timestamp)
.build(ops.into_iter())
.unwrap())
}
}
}