automerge/rust/automerge/src/storage/chunk.rs
Alex Good c3c04128f5 Only observe the current state on load
Problem: When loading a document whilst passing an `OpObserver` we call
the OpObserver for every change in the loaded document. This slows down
the loading process for two reasons: 1) we have to make a call to the
observer for every op 2) we cannot just stream the ops into the OpSet in
topological order but must instead buffer them to pass to the observer.

Solution: Construct the OpSet first, then only traverse the visible ops
in the OpSet, calling the observer. For documents with a deep history
this results in vastly fewer calls to the observer and also allows us to
construct the OpSet much more quickly. It is slightly different
semantically because the observer never gets notified of changes which
are not visible, but that shouldn't matter to most observers.
2023-02-03 10:01:12 +00:00

293 lines
9 KiB
Rust

use std::{
borrow::Cow,
convert::{TryFrom, TryInto},
io::Read,
ops::Range,
};
use sha2::{Digest, Sha256};
use super::{change::Unverified, parse, Change, Compressed, Document, MAGIC_BYTES};
use crate::{columnar::encoding::leb128::ulebsize, ChangeHash};
pub(crate) enum Chunk<'a> {
Document(Document<'a>),
Change(Change<'a, Unverified>),
CompressedChange(Change<'static, Unverified>, Compressed<'a>),
}
pub(crate) mod error {
use super::parse;
use crate::storage::{change, document};
#[derive(thiserror::Error, Debug)]
pub(crate) enum Chunk {
#[error("there was data in a chunk leftover after parsing")]
LeftoverData,
#[error(transparent)]
Leb128(#[from] parse::leb128::Error),
#[error("failed to parse header: {0}")]
Header(#[from] Header),
#[error("bad change chunk: {0}")]
Change(#[from] change::ParseError),
#[error("bad document chunk: {0}")]
Document(#[from] document::ParseError),
#[error("unable to decompresse compressed chunk")]
Deflate,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum Header {
#[error(transparent)]
Leb128(#[from] parse::leb128::Error),
#[error("unknown chunk type: {0}")]
UnknownChunkType(u8),
#[error("Invalid magic bytes")]
InvalidMagicBytes,
}
}
impl<'a> Chunk<'a> {
pub(crate) fn parse(
input: parse::Input<'a>,
) -> parse::ParseResult<'a, Chunk<'a>, error::Chunk> {
let (i, header) = Header::parse::<error::Chunk>(input)?;
let parse::Split {
first: chunk_input,
remaining,
} = i.split(header.data_bytes().len());
tracing::trace!(?header, "parsed chunk header");
let chunk = match header.chunk_type {
ChunkType::Change => {
let (remaining, change) =
Change::parse_following_header(chunk_input, header).map_err(|e| e.lift())?;
if !remaining.is_empty() {
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
}
Chunk::Change(change)
}
ChunkType::Document => {
let (remaining, doc) =
Document::parse(chunk_input, header).map_err(|e| e.lift())?;
if !remaining.is_empty() {
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
}
Chunk::Document(doc)
}
ChunkType::Compressed => {
let compressed = &input.unconsumed_bytes()[header.data_bytes()];
let mut decoder = flate2::bufread::DeflateDecoder::new(compressed);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(|_| parse::ParseError::Error(error::Chunk::Deflate))?;
let inner_header = header.with_data(ChunkType::Change, &decompressed);
let mut inner_chunk = Vec::with_capacity(inner_header.len() + decompressed.len());
inner_header.write(&mut inner_chunk);
inner_chunk.extend(&decompressed);
let (remaining, change) =
Change::parse(parse::Input::new(&inner_chunk)).map_err(|e| e.lift())?;
if !remaining.is_empty() {
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
}
Chunk::CompressedChange(
change.into_owned(),
Compressed::new(header.checksum, Cow::Borrowed(chunk_input.bytes())),
)
}
};
Ok((remaining, chunk))
}
pub(crate) fn checksum_valid(&self) -> bool {
match self {
Self::Document(d) => d.checksum_valid(),
Self::Change(c) => c.checksum_valid(),
Self::CompressedChange(change, compressed) => {
compressed.checksum() == change.checksum() && change.checksum_valid()
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) enum ChunkType {
Document,
Change,
Compressed,
}
impl TryFrom<u8> for ChunkType {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Document),
1 => Ok(Self::Change),
2 => Ok(Self::Compressed),
other => Err(other),
}
}
}
impl From<ChunkType> for u8 {
fn from(ct: ChunkType) -> Self {
match ct {
ChunkType::Document => 0,
ChunkType::Change => 1,
ChunkType::Compressed => 2,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct CheckSum([u8; 4]);
impl CheckSum {
pub(crate) fn bytes(&self) -> [u8; 4] {
self.0
}
}
impl From<[u8; 4]> for CheckSum {
fn from(raw: [u8; 4]) -> Self {
CheckSum(raw)
}
}
impl AsRef<[u8]> for CheckSum {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl From<ChangeHash> for CheckSum {
fn from(h: ChangeHash) -> Self {
let bytes = h.as_bytes();
[bytes[0], bytes[1], bytes[2], bytes[3]].into()
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Header {
checksum: CheckSum,
chunk_type: ChunkType,
data_len: usize,
header_size: usize,
hash: ChangeHash,
}
impl Header {
pub(crate) fn new(chunk_type: ChunkType, data: &[u8]) -> Self {
let hash = hash(chunk_type, data);
Self {
hash,
checksum: hash.checksum().into(),
data_len: data.len(),
header_size: MAGIC_BYTES.len()
+ 4 // checksum
+ 1 // chunk type
+ (ulebsize(data.len() as u64) as usize),
chunk_type,
}
}
/// Returns a header with the same checksum but with a different chunk type and data length.
/// This is primarily useful when processing compressed chunks, where the checksum is actually
/// derived from the uncompressed data.
pub(crate) fn with_data(&self, chunk_type: ChunkType, data: &[u8]) -> Header {
let hash = hash(chunk_type, data);
Self {
hash,
checksum: self.checksum,
data_len: data.len(),
header_size: MAGIC_BYTES.len()
+ 4 // checksum
+ 1 // chunk type
+ (ulebsize(data.len() as u64) as usize),
chunk_type,
}
}
pub(crate) fn len(&self) -> usize {
self.header_size
}
pub(crate) fn write(&self, out: &mut Vec<u8>) {
out.extend(MAGIC_BYTES);
out.extend(self.checksum.bytes());
out.push(u8::from(self.chunk_type));
leb128::write::unsigned(out, self.data_len as u64).unwrap();
}
pub(crate) fn parse<E>(input: parse::Input<'_>) -> parse::ParseResult<'_, Header, E>
where
E: From<error::Header>,
{
let (
i,
parse::RangeOf {
range: header,
value: (checksum_bytes, chunk_type, chunk_len),
},
) = parse::range_of(
|i| {
let (i, magic) = parse::take4(i)?;
if magic != MAGIC_BYTES {
return Err(parse::ParseError::Error(E::from(
error::Header::InvalidMagicBytes,
)));
}
let (i, checksum_bytes) = parse::take4(i)?;
let (i, raw_chunk_type) = parse::take1(i)?;
let chunk_type: ChunkType = raw_chunk_type.try_into().map_err(|_| {
parse::ParseError::Error(E::from(error::Header::UnknownChunkType(
raw_chunk_type,
)))
})?;
let (i, chunk_len) = parse::leb128_u64(i).map_err(|e| e.lift())?;
Ok((i, (checksum_bytes, chunk_type, chunk_len)))
},
input,
)?;
let (_, data) = parse::take_n(chunk_len as usize, i)?;
let hash = hash(chunk_type, data);
Ok((
i,
Header {
checksum: checksum_bytes.into(),
chunk_type,
data_len: data.len(),
header_size: header.len(),
hash,
},
))
}
/// The range of the input which corresponds to the data specified by this header
pub(crate) fn data_bytes(&self) -> Range<usize> {
self.header_size..(self.header_size + self.data_len)
}
pub(crate) fn hash(&self) -> ChangeHash {
self.hash
}
pub(crate) fn checksum_valid(&self) -> bool {
CheckSum(self.hash.checksum()) == self.checksum
}
pub(crate) fn checksum(&self) -> CheckSum {
self.checksum
}
}
fn hash(typ: ChunkType, data: &[u8]) -> ChangeHash {
let mut out = vec![u8::from(typ)];
leb128::write::unsigned(&mut out, data.len() as u64).unwrap();
out.extend(data);
let hash_result = Sha256::digest(out);
let array: [u8; 32] = hash_result.into();
ChangeHash(array)
}