automerge/rust/automerge/src/storage/change.rs
2022-11-05 22:48:43 +00:00

508 lines
17 KiB
Rust

use std::{borrow::Cow, io::Write, marker::PhantomData, num::NonZeroU64, ops::Range};
use crate::{convert, ActorId, ChangeHash, ScalarValue};
use super::{parse, shift_range, CheckSum, ChunkType, Columns, Header, RawColumns};
mod change_op_columns;
use change_op_columns::ChangeOpsColumns;
pub(crate) use change_op_columns::{ChangeOp, ReadChangeOpError};
mod change_actors;
pub(crate) use change_actors::PredOutOfOrder;
mod compressed;
mod op_with_change_actors;
pub(crate) use compressed::Compressed;
pub(crate) const DEFLATE_MIN_SIZE: usize = 256;
/// Changes present an iterator over the operations encoded in them. Before we have read these
/// changes we don't know if they are valid, so we expose an iterator with items which are
/// `Result`s. However, frequently we know that the changes are valid, this trait is used as a
/// witness that we have verified the operations in a change so we can expose an iterator which
/// does not return `Results`
pub(crate) trait OpReadState {}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Verified;
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Unverified;
impl OpReadState for Verified {}
impl OpReadState for Unverified {}
/// A `Change` is the result of parsing a change chunk as specified in [1]
///
/// The type parameter to this type represents whether or not operation have been "verified".
/// Operations in a change chunk are stored in a compressed column oriented storage format. In
/// general there is no guarantee that this storage is valid. Therefore we use the `OpReadState`
/// type parameter to distinguish between contexts where we know that the ops are valid and those
/// where we don't. The `Change::verify_ops` method can be used to obtain a verified `Change` which
/// can provide an iterator over `ChangeOp`s directly, rather than over `Result<ChangeOp,
/// ReadChangeOpError>`.
///
/// [1]: https://alexjg.github.io/automerge-storage-docs/#change-chunks
#[derive(Clone, Debug)]
pub(crate) struct Change<'a, O: OpReadState> {
/// The raw bytes of the entire chunk containing this change, including the header.
bytes: Cow<'a, [u8]>,
header: Header,
dependencies: Vec<ChangeHash>,
actor: ActorId,
other_actors: Vec<ActorId>,
seq: u64,
start_op: NonZeroU64,
timestamp: i64,
message: Option<String>,
ops_meta: ChangeOpsColumns,
/// The range in `Self::bytes` where the ops column data is
ops_data: Range<usize>,
extra_bytes: Range<usize>,
_phantom: PhantomData<O>,
}
impl<'a, O: OpReadState> PartialEq for Change<'a, O> {
fn eq(&self, other: &Self) -> bool {
self.bytes == other.bytes
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ParseError {
#[error(transparent)]
Leb128(#[from] parse::leb128::Error),
#[error(transparent)]
InvalidUtf8(#[from] parse::InvalidUtf8),
#[error("failed to parse change columns: {0}")]
RawColumns(#[from] crate::storage::columns::raw_column::ParseError),
#[error("failed to parse header: {0}")]
Header(#[from] super::chunk::error::Header),
#[error("change contained compressed columns")]
CompressedChangeCols,
#[error("invalid change cols: {0}")]
InvalidColumns(Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl<'a> Change<'a, Unverified> {
pub(crate) fn parse(
input: parse::Input<'a>,
) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> {
// TODO(alex): check chunk type
let (i, header) = Header::parse(input)?;
let parse::Split {
first: chunk_input,
remaining,
} = i.split(header.data_bytes().len());
let (_, change) = Self::parse_following_header(chunk_input, header)?;
Ok((remaining, change))
}
/// Parse a change chunk. `input` should be the entire chunk, including the header bytes.
pub(crate) fn parse_following_header(
input: parse::Input<'a>,
header: Header,
) -> parse::ParseResult<'_, Change<'a, Unverified>, ParseError> {
let (i, deps) = parse::length_prefixed(parse::change_hash)(input)?;
let (i, actor) = parse::actor_id(i)?;
let (i, seq) = parse::leb128_u64(i)?;
let (i, start_op) = parse::nonzero_leb128_u64(i)?;
let (i, timestamp) = parse::leb128_i64(i)?;
let (i, message_len) = parse::leb128_u64(i)?;
let (i, message) = parse::utf_8(message_len as usize, i)?;
let (i, other_actors) = parse::length_prefixed(parse::actor_id)(i)?;
let (i, ops_meta) = RawColumns::parse(i)?;
let (
i,
parse::RangeOf {
range: ops_data, ..
},
) = parse::range_of(|i| parse::take_n(ops_meta.total_column_len(), i), i)?;
let (
_i,
parse::RangeOf {
range: extra_bytes, ..
},
) = parse::range_of(parse::take_rest, i)?;
let ops_meta = ops_meta
.uncompressed()
.ok_or(parse::ParseError::Error(ParseError::CompressedChangeCols))?;
let col_layout = Columns::parse(ops_data.len(), ops_meta.iter())
.map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
let ops_meta = ChangeOpsColumns::try_from(col_layout)
.map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
Ok((
parse::Input::empty(),
Change {
bytes: input.bytes().into(),
header,
dependencies: deps,
actor,
other_actors,
seq,
start_op,
timestamp,
message: if message.is_empty() {
None
} else {
Some(message)
},
ops_meta,
ops_data,
extra_bytes,
_phantom: PhantomData,
},
))
}
/// Iterate over the ops in this chunk. The iterator will return an error if any of the ops are
/// malformed.
pub(crate) fn iter_ops(
&'a self,
) -> impl Iterator<Item = Result<ChangeOp, ReadChangeOpError>> + Clone + 'a {
self.ops_meta.iter(self.ops_data())
}
/// Verify all the ops in this change executing `f` for each one
///
/// `f` will be called for each op in this change, allowing callers to collect additional
/// information about the ops (e.g. all the actor IDs in the change, or the number of ops)
///
/// # Errors
/// * If there is an error reading an operation
pub(crate) fn verify_ops<F: FnMut(ChangeOp)>(
self,
mut f: F,
) -> Result<Change<'a, Verified>, ReadChangeOpError> {
for op in self.iter_ops() {
f(op?);
}
Ok(Change {
bytes: self.bytes,
header: self.header,
dependencies: self.dependencies,
actor: self.actor,
other_actors: self.other_actors,
seq: self.seq,
start_op: self.start_op,
timestamp: self.timestamp,
message: self.message,
ops_meta: self.ops_meta,
ops_data: self.ops_data,
extra_bytes: self.extra_bytes,
_phantom: PhantomData,
})
}
}
impl<'a> Change<'a, Verified> {
pub(crate) fn builder() -> ChangeBuilder<Unset, Unset, Unset, Unset> {
ChangeBuilder::new()
}
pub(crate) fn iter_ops(&'a self) -> impl Iterator<Item = ChangeOp> + Clone + 'a {
// SAFETY: This unwrap is okay because a `Change<'_, Verified>` can only be constructed
// using either `verify_ops` or `Builder::build`, so we know the ops columns are valid.
self.ops_meta.iter(self.ops_data()).map(|o| o.unwrap())
}
}
impl<'a, O: OpReadState> Change<'a, O> {
pub(crate) fn checksum(&self) -> CheckSum {
self.header.checksum()
}
pub(crate) fn actor(&self) -> &ActorId {
&self.actor
}
pub(crate) fn other_actors(&self) -> &[ActorId] {
&self.other_actors
}
pub(crate) fn start_op(&self) -> NonZeroU64 {
self.start_op
}
pub(crate) fn message(&self) -> &Option<String> {
&self.message
}
pub(crate) fn dependencies(&self) -> &[ChangeHash] {
&self.dependencies
}
pub(crate) fn seq(&self) -> u64 {
self.seq
}
pub(crate) fn timestamp(&self) -> i64 {
self.timestamp
}
pub(crate) fn extra_bytes(&self) -> &[u8] {
&self.bytes[self.extra_bytes.clone()]
}
pub(crate) fn checksum_valid(&self) -> bool {
self.header.checksum_valid()
}
pub(crate) fn body_bytes(&self) -> &[u8] {
&self.bytes[self.header.len()..]
}
pub(crate) fn bytes(&self) -> &[u8] {
&self.bytes
}
pub(crate) fn hash(&self) -> ChangeHash {
self.header.hash()
}
pub(crate) fn ops_data(&self) -> &[u8] {
&self.bytes[self.ops_data.clone()]
}
pub(crate) fn into_owned(self) -> Change<'static, O> {
Change {
dependencies: self.dependencies,
bytes: Cow::Owned(self.bytes.into_owned()),
header: self.header,
actor: self.actor,
other_actors: self.other_actors,
seq: self.seq,
start_op: self.start_op,
timestamp: self.timestamp,
message: self.message,
ops_meta: self.ops_meta,
ops_data: self.ops_data,
extra_bytes: self.extra_bytes,
_phantom: PhantomData,
}
}
pub(crate) fn compress(&self) -> Option<Compressed<'static>> {
if self.bytes.len() > DEFLATE_MIN_SIZE {
Some(Compressed::compress(self))
} else {
None
}
}
}
fn length_prefixed_bytes<B: AsRef<[u8]>>(b: B, out: &mut Vec<u8>) -> usize {
let prefix_len = leb128::write::unsigned(out, b.as_ref().len() as u64).unwrap();
out.write_all(b.as_ref()).unwrap();
prefix_len + b.as_ref().len()
}
// Bunch of type safe builder boilerplate
pub(crate) struct Unset;
pub(crate) struct Set<T> {
value: T,
}
#[allow(non_camel_case_types)]
pub(crate) struct ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
dependencies: Vec<ChangeHash>,
actor: ACTOR,
seq: SEQ,
start_op: START_OP,
timestamp: TIME,
message: Option<String>,
extra_bytes: Option<Vec<u8>>,
}
impl ChangeBuilder<Unset, Unset, Unset, Unset> {
pub(crate) fn new() -> Self {
Self {
dependencies: vec![],
actor: Unset,
seq: Unset,
start_op: Unset,
timestamp: Unset,
message: None,
extra_bytes: None,
}
}
}
#[allow(non_camel_case_types)]
impl<START_OP, ACTOR, SEQ, TIME> ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
pub(crate) fn with_dependencies(self, mut dependencies: Vec<ChangeHash>) -> Self {
dependencies.sort_unstable();
Self {
dependencies,
..self
}
}
pub(crate) fn with_message(self, message: Option<String>) -> Self {
Self { message, ..self }
}
pub(crate) fn with_extra_bytes(self, extra_bytes: Vec<u8>) -> Self {
Self {
extra_bytes: Some(extra_bytes),
..self
}
}
}
#[allow(non_camel_case_types)]
impl<START_OP, ACTOR, TIME> ChangeBuilder<START_OP, ACTOR, Unset, TIME> {
pub(crate) fn with_seq(self, seq: u64) -> ChangeBuilder<START_OP, ACTOR, Set<u64>, TIME> {
ChangeBuilder {
dependencies: self.dependencies,
actor: self.actor,
seq: Set { value: seq },
start_op: self.start_op,
timestamp: self.timestamp,
message: self.message,
extra_bytes: self.extra_bytes,
}
}
}
#[allow(non_camel_case_types)]
impl<START_OP, SEQ, TIME> ChangeBuilder<START_OP, Unset, SEQ, TIME> {
pub(crate) fn with_actor(
self,
actor: ActorId,
) -> ChangeBuilder<START_OP, Set<ActorId>, SEQ, TIME> {
ChangeBuilder {
dependencies: self.dependencies,
actor: Set { value: actor },
seq: self.seq,
start_op: self.start_op,
timestamp: self.timestamp,
message: self.message,
extra_bytes: self.extra_bytes,
}
}
}
impl<ACTOR, SEQ, TIME> ChangeBuilder<Unset, ACTOR, SEQ, TIME> {
pub(crate) fn with_start_op(
self,
start_op: NonZeroU64,
) -> ChangeBuilder<Set<NonZeroU64>, ACTOR, SEQ, TIME> {
ChangeBuilder {
dependencies: self.dependencies,
actor: self.actor,
seq: self.seq,
start_op: Set { value: start_op },
timestamp: self.timestamp,
message: self.message,
extra_bytes: self.extra_bytes,
}
}
}
#[allow(non_camel_case_types)]
impl<START_OP, ACTOR, SEQ> ChangeBuilder<START_OP, ACTOR, SEQ, Unset> {
pub(crate) fn with_timestamp(self, time: i64) -> ChangeBuilder<START_OP, ACTOR, SEQ, Set<i64>> {
ChangeBuilder {
dependencies: self.dependencies,
actor: self.actor,
seq: self.seq,
start_op: self.start_op,
timestamp: Set { value: time },
message: self.message,
extra_bytes: self.extra_bytes,
}
}
}
/// A row to be encoded as a change op
///
/// The lifetime `'a` is the lifetime of the value and key data types. For types which cannot
/// provide a reference (e.g. because they are decoding from some columnar storage on each
/// iteration) this should be `'static`.
pub(crate) trait AsChangeOp<'a> {
/// The type of the Actor ID component of the op IDs for this impl. This is typically either
/// `&'a ActorID` or `usize`
type ActorId;
/// The type of the op IDs this impl produces.
type OpId: convert::OpId<Self::ActorId>;
/// The type of the predecessor iterator returned by `Self::pred`. This can often be omitted
type PredIter: Iterator<Item = Self::OpId> + ExactSizeIterator;
fn obj(&self) -> convert::ObjId<Self::OpId>;
fn key(&self) -> convert::Key<'a, Self::OpId>;
fn insert(&self) -> bool;
fn action(&self) -> u64;
fn val(&self) -> Cow<'a, ScalarValue>;
fn pred(&self) -> Self::PredIter;
}
impl ChangeBuilder<Set<NonZeroU64>, Set<ActorId>, Set<u64>, Set<i64>> {
pub(crate) fn build<'a, A, I, O>(
self,
ops: I,
) -> Result<Change<'static, Verified>, PredOutOfOrder>
where
A: AsChangeOp<'a, OpId = O> + 'a,
O: convert::OpId<&'a ActorId> + 'a,
I: Iterator<Item = A> + Clone + 'a,
{
let mut col_data = Vec::new();
let actors = change_actors::ChangeActors::new(self.actor.value, ops)?;
let cols = ChangeOpsColumns::encode(actors.iter(), &mut col_data);
let (actor, other_actors) = actors.done();
let mut data = Vec::with_capacity(col_data.len());
leb128::write::unsigned(&mut data, self.dependencies.len() as u64).unwrap();
for dep in &self.dependencies {
data.write_all(dep.as_bytes()).unwrap();
}
length_prefixed_bytes(&actor, &mut data);
leb128::write::unsigned(&mut data, self.seq.value).unwrap();
leb128::write::unsigned(&mut data, self.start_op.value.into()).unwrap();
leb128::write::signed(&mut data, self.timestamp.value).unwrap();
length_prefixed_bytes(
self.message.as_ref().map(|m| m.as_bytes()).unwrap_or(&[]),
&mut data,
);
leb128::write::unsigned(&mut data, other_actors.len() as u64).unwrap();
for actor in other_actors.iter() {
length_prefixed_bytes(actor, &mut data);
}
cols.raw_columns().write(&mut data);
let ops_data_start = data.len();
let ops_data = ops_data_start..(ops_data_start + col_data.len());
data.extend(col_data);
let extra_bytes =
data.len()..(data.len() + self.extra_bytes.as_ref().map(|e| e.len()).unwrap_or(0));
if let Some(extra) = self.extra_bytes {
data.extend(extra);
}
let header = Header::new(ChunkType::Change, &data);
let mut bytes = Vec::with_capacity(header.len() + data.len());
header.write(&mut bytes);
bytes.extend(data);
let ops_data = shift_range(ops_data, header.len());
let extra_bytes = shift_range(extra_bytes, header.len());
Ok(Change {
bytes: Cow::Owned(bytes),
header,
dependencies: self.dependencies,
actor,
other_actors,
seq: self.seq.value,
start_op: self.start_op.value,
timestamp: self.timestamp.value,
message: self.message,
ops_meta: cols,
ops_data,
extra_bytes,
_phantom: PhantomData,
})
}
}