use std::{borrow::Cow, io::Write, marker::PhantomData, num::NonZeroU64, ops::Range}; use crate::{convert, ActorId, ChangeHash, ScalarValue}; use super::{parse, shift_range, CheckSum, ChunkType, Columns, Header, RawColumns}; mod change_op_columns; use change_op_columns::ChangeOpsColumns; pub(crate) use change_op_columns::{ChangeOp, ReadChangeOpError}; mod change_actors; pub(crate) use change_actors::PredOutOfOrder; mod compressed; mod op_with_change_actors; pub(crate) use compressed::Compressed; pub(crate) const DEFLATE_MIN_SIZE: usize = 256; /// Changes present an iterator over the operations encoded in them. Before we have read these /// changes we don't know if they are valid, so we expose an iterator with items which are /// `Result`s. However, frequently we know that the changes are valid, this trait is used as a /// witness that we have verified the operations in a change so we can expose an iterator which /// does not return `Results` pub(crate) trait OpReadState {} #[derive(Debug, Clone, PartialEq)] pub(crate) struct Verified; #[derive(Debug, Clone, PartialEq)] pub(crate) struct Unverified; impl OpReadState for Verified {} impl OpReadState for Unverified {} /// A `Change` is the result of parsing a change chunk as specified in [1] /// /// The type parameter to this type represents whether or not operation have been "verified". /// Operations in a change chunk are stored in a compressed column oriented storage format. In /// general there is no guarantee that this storage is valid. Therefore we use the `OpReadState` /// type parameter to distinguish between contexts where we know that the ops are valid and those /// where we don't. The `Change::verify_ops` method can be used to obtain a verified `Change` which /// can provide an iterator over `ChangeOp`s directly, rather than over `Result`. /// /// [1]: https://alexjg.github.io/automerge-storage-docs/#change-chunks #[derive(Clone, Debug)] pub(crate) struct Change<'a, O: OpReadState> { /// The raw bytes of the entire chunk containing this change, including the header. bytes: Cow<'a, [u8]>, header: Header, dependencies: Vec, actor: ActorId, other_actors: Vec, seq: u64, start_op: NonZeroU64, timestamp: i64, message: Option, ops_meta: ChangeOpsColumns, /// The range in `Self::bytes` where the ops column data is ops_data: Range, extra_bytes: Range, _phantom: PhantomData, } impl<'a, O: OpReadState> PartialEq for Change<'a, O> { fn eq(&self, other: &Self) -> bool { self.bytes == other.bytes } } #[derive(thiserror::Error, Debug)] pub(crate) enum ParseError { #[error(transparent)] Leb128(#[from] parse::leb128::Error), #[error(transparent)] InvalidUtf8(#[from] parse::InvalidUtf8), #[error("failed to parse change columns: {0}")] RawColumns(#[from] crate::storage::columns::raw_column::ParseError), #[error("failed to parse header: {0}")] Header(#[from] super::chunk::error::Header), #[error("change contained compressed columns")] CompressedChangeCols, #[error("invalid change cols: {0}")] InvalidColumns(Box), } impl<'a> Change<'a, Unverified> { pub(crate) fn parse( input: parse::Input<'a>, ) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> { // TODO(alex): check chunk type let (i, header) = Header::parse(input)?; let parse::Split { first: chunk_input, remaining, } = i.split(header.data_bytes().len()); let (_, change) = Self::parse_following_header(chunk_input, header)?; Ok((remaining, change)) } /// Parse a change chunk. `input` should be the entire chunk, including the header bytes. pub(crate) fn parse_following_header( input: parse::Input<'a>, header: Header, ) -> parse::ParseResult<'_, Change<'a, Unverified>, ParseError> { let (i, deps) = parse::length_prefixed(parse::change_hash)(input)?; let (i, actor) = parse::actor_id(i)?; let (i, seq) = parse::leb128_u64(i)?; let (i, start_op) = parse::nonzero_leb128_u64(i)?; let (i, timestamp) = parse::leb128_i64(i)?; let (i, message_len) = parse::leb128_u64(i)?; let (i, message) = parse::utf_8(message_len as usize, i)?; let (i, other_actors) = parse::length_prefixed(parse::actor_id)(i)?; let (i, ops_meta) = RawColumns::parse(i)?; let ( i, parse::RangeOf { range: ops_data, .. }, ) = parse::range_of(|i| parse::take_n(ops_meta.total_column_len(), i), i)?; let ( _i, parse::RangeOf { range: extra_bytes, .. }, ) = parse::range_of(parse::take_rest, i)?; let ops_meta = ops_meta .uncompressed() .ok_or(parse::ParseError::Error(ParseError::CompressedChangeCols))?; let col_layout = Columns::parse(ops_data.len(), ops_meta.iter()) .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?; let ops_meta = ChangeOpsColumns::try_from(col_layout) .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?; Ok(( parse::Input::empty(), Change { bytes: input.bytes().into(), header, dependencies: deps, actor, other_actors, seq, start_op, timestamp, message: if message.is_empty() { None } else { Some(message) }, ops_meta, ops_data, extra_bytes, _phantom: PhantomData, }, )) } /// Iterate over the ops in this chunk. The iterator will return an error if any of the ops are /// malformed. pub(crate) fn iter_ops( &'a self, ) -> impl Iterator> + Clone + 'a { self.ops_meta.iter(self.ops_data()) } /// Verify all the ops in this change executing `f` for each one /// /// `f` will be called for each op in this change, allowing callers to collect additional /// information about the ops (e.g. all the actor IDs in the change, or the number of ops) /// /// # Errors /// * If there is an error reading an operation pub(crate) fn verify_ops( self, mut f: F, ) -> Result, ReadChangeOpError> { for op in self.iter_ops() { f(op?); } Ok(Change { bytes: self.bytes, header: self.header, dependencies: self.dependencies, actor: self.actor, other_actors: self.other_actors, seq: self.seq, start_op: self.start_op, timestamp: self.timestamp, message: self.message, ops_meta: self.ops_meta, ops_data: self.ops_data, extra_bytes: self.extra_bytes, _phantom: PhantomData, }) } } impl<'a> Change<'a, Verified> { pub(crate) fn builder() -> ChangeBuilder { ChangeBuilder::new() } pub(crate) fn iter_ops(&'a self) -> impl Iterator + Clone + 'a { // SAFETY: This unwrap is okay because a `Change<'_, Verified>` can only be constructed // using either `verify_ops` or `Builder::build`, so we know the ops columns are valid. self.ops_meta.iter(self.ops_data()).map(|o| o.unwrap()) } } impl<'a, O: OpReadState> Change<'a, O> { pub(crate) fn checksum(&self) -> CheckSum { self.header.checksum() } pub(crate) fn actor(&self) -> &ActorId { &self.actor } pub(crate) fn other_actors(&self) -> &[ActorId] { &self.other_actors } pub(crate) fn start_op(&self) -> NonZeroU64 { self.start_op } pub(crate) fn message(&self) -> &Option { &self.message } pub(crate) fn dependencies(&self) -> &[ChangeHash] { &self.dependencies } pub(crate) fn seq(&self) -> u64 { self.seq } pub(crate) fn timestamp(&self) -> i64 { self.timestamp } pub(crate) fn extra_bytes(&self) -> &[u8] { &self.bytes[self.extra_bytes.clone()] } pub(crate) fn checksum_valid(&self) -> bool { self.header.checksum_valid() } pub(crate) fn body_bytes(&self) -> &[u8] { &self.bytes[self.header.len()..] } pub(crate) fn bytes(&self) -> &[u8] { &self.bytes } pub(crate) fn hash(&self) -> ChangeHash { self.header.hash() } pub(crate) fn ops_data(&self) -> &[u8] { &self.bytes[self.ops_data.clone()] } pub(crate) fn into_owned(self) -> Change<'static, O> { Change { dependencies: self.dependencies, bytes: Cow::Owned(self.bytes.into_owned()), header: self.header, actor: self.actor, other_actors: self.other_actors, seq: self.seq, start_op: self.start_op, timestamp: self.timestamp, message: self.message, ops_meta: self.ops_meta, ops_data: self.ops_data, extra_bytes: self.extra_bytes, _phantom: PhantomData, } } pub(crate) fn compress(&self) -> Option> { if self.bytes.len() > DEFLATE_MIN_SIZE { Some(Compressed::compress(self)) } else { None } } } fn length_prefixed_bytes>(b: B, out: &mut Vec) -> usize { let prefix_len = leb128::write::unsigned(out, b.as_ref().len() as u64).unwrap(); out.write_all(b.as_ref()).unwrap(); prefix_len + b.as_ref().len() } // Bunch of type safe builder boilerplate pub(crate) struct Unset; pub(crate) struct Set { value: T, } #[allow(non_camel_case_types)] pub(crate) struct ChangeBuilder { dependencies: Vec, actor: ACTOR, seq: SEQ, start_op: START_OP, timestamp: TIME, message: Option, extra_bytes: Option>, } impl ChangeBuilder { pub(crate) fn new() -> Self { Self { dependencies: vec![], actor: Unset, seq: Unset, start_op: Unset, timestamp: Unset, message: None, extra_bytes: None, } } } #[allow(non_camel_case_types)] impl ChangeBuilder { pub(crate) fn with_dependencies(self, mut dependencies: Vec) -> Self { dependencies.sort_unstable(); Self { dependencies, ..self } } pub(crate) fn with_message(self, message: Option) -> Self { Self { message, ..self } } pub(crate) fn with_extra_bytes(self, extra_bytes: Vec) -> Self { Self { extra_bytes: Some(extra_bytes), ..self } } } #[allow(non_camel_case_types)] impl ChangeBuilder { pub(crate) fn with_seq(self, seq: u64) -> ChangeBuilder, TIME> { ChangeBuilder { dependencies: self.dependencies, actor: self.actor, seq: Set { value: seq }, start_op: self.start_op, timestamp: self.timestamp, message: self.message, extra_bytes: self.extra_bytes, } } } #[allow(non_camel_case_types)] impl ChangeBuilder { pub(crate) fn with_actor( self, actor: ActorId, ) -> ChangeBuilder, SEQ, TIME> { ChangeBuilder { dependencies: self.dependencies, actor: Set { value: actor }, seq: self.seq, start_op: self.start_op, timestamp: self.timestamp, message: self.message, extra_bytes: self.extra_bytes, } } } impl ChangeBuilder { pub(crate) fn with_start_op( self, start_op: NonZeroU64, ) -> ChangeBuilder, ACTOR, SEQ, TIME> { ChangeBuilder { dependencies: self.dependencies, actor: self.actor, seq: self.seq, start_op: Set { value: start_op }, timestamp: self.timestamp, message: self.message, extra_bytes: self.extra_bytes, } } } #[allow(non_camel_case_types)] impl ChangeBuilder { pub(crate) fn with_timestamp(self, time: i64) -> ChangeBuilder> { ChangeBuilder { dependencies: self.dependencies, actor: self.actor, seq: self.seq, start_op: self.start_op, timestamp: Set { value: time }, message: self.message, extra_bytes: self.extra_bytes, } } } /// A row to be encoded as a change op /// /// The lifetime `'a` is the lifetime of the value and key data types. For types which cannot /// provide a reference (e.g. because they are decoding from some columnar storage on each /// iteration) this should be `'static`. pub(crate) trait AsChangeOp<'a> { /// The type of the Actor ID component of the op IDs for this impl. This is typically either /// `&'a ActorID` or `usize` type ActorId; /// The type of the op IDs this impl produces. type OpId: convert::OpId; /// The type of the predecessor iterator returned by `Self::pred`. This can often be omitted type PredIter: Iterator + ExactSizeIterator; fn obj(&self) -> convert::ObjId; fn key(&self) -> convert::Key<'a, Self::OpId>; fn insert(&self) -> bool; fn action(&self) -> u64; fn val(&self) -> Cow<'a, ScalarValue>; fn pred(&self) -> Self::PredIter; } impl ChangeBuilder, Set, Set, Set> { pub(crate) fn build<'a, A, I, O>( self, ops: I, ) -> Result, PredOutOfOrder> where A: AsChangeOp<'a, OpId = O> + 'a, O: convert::OpId<&'a ActorId> + 'a, I: Iterator + Clone + 'a, { let mut col_data = Vec::new(); let actors = change_actors::ChangeActors::new(self.actor.value, ops)?; let cols = ChangeOpsColumns::encode(actors.iter(), &mut col_data); let (actor, other_actors) = actors.done(); let mut data = Vec::with_capacity(col_data.len()); leb128::write::unsigned(&mut data, self.dependencies.len() as u64).unwrap(); for dep in &self.dependencies { data.write_all(dep.as_bytes()).unwrap(); } length_prefixed_bytes(&actor, &mut data); leb128::write::unsigned(&mut data, self.seq.value).unwrap(); leb128::write::unsigned(&mut data, self.start_op.value.into()).unwrap(); leb128::write::signed(&mut data, self.timestamp.value).unwrap(); length_prefixed_bytes( self.message.as_ref().map(|m| m.as_bytes()).unwrap_or(&[]), &mut data, ); leb128::write::unsigned(&mut data, other_actors.len() as u64).unwrap(); for actor in other_actors.iter() { length_prefixed_bytes(&actor, &mut data); } cols.raw_columns().write(&mut data); let ops_data_start = data.len(); let ops_data = ops_data_start..(ops_data_start + col_data.len()); data.extend(col_data); let extra_bytes = data.len()..(data.len() + self.extra_bytes.as_ref().map(|e| e.len()).unwrap_or(0)); if let Some(extra) = self.extra_bytes { data.extend(extra); } let header = Header::new(ChunkType::Change, &data); let mut bytes = Vec::with_capacity(header.len() + data.len()); header.write(&mut bytes); bytes.extend(data); let ops_data = shift_range(ops_data, header.len()); let extra_bytes = shift_range(extra_bytes, header.len()); Ok(Change { bytes: Cow::Owned(bytes), header, dependencies: self.dependencies, actor, other_actors, seq: self.seq.value, start_op: self.start_op.value, timestamp: self.timestamp.value, message: self.message, ops_meta: cols, ops_data, extra_bytes, _phantom: PhantomData, }) } }