The compressed document format includes at the end of the document chunk the indicies of the heads of the document. Older versions of the javascript implementation do not include these indicies so we allow them to be omitted when decoding. Whilst we're here add some tracing::trace logs to make it easier to understand where parsing is failing.
293 lines
9 KiB
Rust
293 lines
9 KiB
Rust
use std::{
|
|
borrow::Cow,
|
|
convert::{TryFrom, TryInto},
|
|
io::Read,
|
|
ops::Range,
|
|
};
|
|
|
|
use sha2::{Digest, Sha256};
|
|
|
|
use super::{change::Unverified, parse, Change, Compressed, Document, MAGIC_BYTES};
|
|
use crate::{columnar::encoding::leb128::ulebsize, ChangeHash};
|
|
|
|
pub(crate) enum Chunk<'a> {
|
|
Document(Document<'a>),
|
|
Change(Change<'a, Unverified>),
|
|
CompressedChange(Change<'static, Unverified>, Compressed<'a>),
|
|
}
|
|
|
|
pub(crate) mod error {
|
|
use super::parse;
|
|
use crate::storage::{change, document};
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum Chunk {
|
|
#[error("there was data in a chunk leftover after parsing")]
|
|
LeftoverData,
|
|
#[error(transparent)]
|
|
Leb128(#[from] parse::leb128::Error),
|
|
#[error("failed to parse header: {0}")]
|
|
Header(#[from] Header),
|
|
#[error("bad change chunk: {0}")]
|
|
Change(#[from] change::ParseError),
|
|
#[error("bad document chunk: {0}")]
|
|
Document(#[from] document::ParseError),
|
|
#[error("unable to decompresse compressed chunk")]
|
|
Deflate,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum Header {
|
|
#[error(transparent)]
|
|
Leb128(#[from] parse::leb128::Error),
|
|
#[error("unknown chunk type: {0}")]
|
|
UnknownChunkType(u8),
|
|
#[error("Invalid magic bytes")]
|
|
InvalidMagicBytes,
|
|
}
|
|
}
|
|
|
|
impl<'a> Chunk<'a> {
|
|
pub(crate) fn parse(
|
|
input: parse::Input<'a>,
|
|
) -> parse::ParseResult<'a, Chunk<'a>, error::Chunk> {
|
|
let (i, header) = Header::parse::<error::Chunk>(input)?;
|
|
let parse::Split {
|
|
first: chunk_input,
|
|
remaining,
|
|
} = i.split(header.data_bytes().len());
|
|
tracing::trace!(?header, "parsed chunk header");
|
|
let chunk = match header.chunk_type {
|
|
ChunkType::Change => {
|
|
let (remaining, change) =
|
|
Change::parse_following_header(chunk_input, header).map_err(|e| e.lift())?;
|
|
if !remaining.is_empty() {
|
|
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
|
|
}
|
|
Chunk::Change(change)
|
|
}
|
|
ChunkType::Document => {
|
|
let (remaining, doc) =
|
|
Document::parse(chunk_input, header).map_err(|e| e.lift())?;
|
|
if !remaining.is_empty() {
|
|
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
|
|
}
|
|
Chunk::Document(doc)
|
|
}
|
|
ChunkType::Compressed => {
|
|
let compressed = &input.unconsumed_bytes()[header.data_bytes()];
|
|
let mut decoder = flate2::bufread::DeflateDecoder::new(compressed);
|
|
let mut decompressed = Vec::new();
|
|
decoder
|
|
.read_to_end(&mut decompressed)
|
|
.map_err(|_| parse::ParseError::Error(error::Chunk::Deflate))?;
|
|
let inner_header = header.with_data(ChunkType::Change, &decompressed);
|
|
let mut inner_chunk = Vec::with_capacity(inner_header.len() + decompressed.len());
|
|
inner_header.write(&mut inner_chunk);
|
|
inner_chunk.extend(&decompressed);
|
|
let (remaining, change) =
|
|
Change::parse(parse::Input::new(&inner_chunk)).map_err(|e| e.lift())?;
|
|
if !remaining.is_empty() {
|
|
return Err(parse::ParseError::Error(error::Chunk::LeftoverData));
|
|
}
|
|
Chunk::CompressedChange(
|
|
change.into_owned(),
|
|
Compressed::new(header.checksum, Cow::Borrowed(chunk_input.bytes())),
|
|
)
|
|
}
|
|
};
|
|
Ok((remaining, chunk))
|
|
}
|
|
|
|
pub(crate) fn checksum_valid(&self) -> bool {
|
|
match self {
|
|
Self::Document(d) => d.checksum_valid(),
|
|
Self::Change(c) => c.checksum_valid(),
|
|
Self::CompressedChange(change, compressed) => {
|
|
compressed.checksum() == change.checksum() && change.checksum_valid()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
|
pub(crate) enum ChunkType {
|
|
Document,
|
|
Change,
|
|
Compressed,
|
|
}
|
|
|
|
impl TryFrom<u8> for ChunkType {
|
|
type Error = u8;
|
|
|
|
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
|
match value {
|
|
0 => Ok(Self::Document),
|
|
1 => Ok(Self::Change),
|
|
2 => Ok(Self::Compressed),
|
|
other => Err(other),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<ChunkType> for u8 {
|
|
fn from(ct: ChunkType) -> Self {
|
|
match ct {
|
|
ChunkType::Document => 0,
|
|
ChunkType::Change => 1,
|
|
ChunkType::Compressed => 2,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
|
pub(crate) struct CheckSum([u8; 4]);
|
|
|
|
impl CheckSum {
|
|
pub(crate) fn bytes(&self) -> [u8; 4] {
|
|
self.0
|
|
}
|
|
}
|
|
|
|
impl From<[u8; 4]> for CheckSum {
|
|
fn from(raw: [u8; 4]) -> Self {
|
|
CheckSum(raw)
|
|
}
|
|
}
|
|
|
|
impl AsRef<[u8]> for CheckSum {
|
|
fn as_ref(&self) -> &[u8] {
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
impl From<ChangeHash> for CheckSum {
|
|
fn from(h: ChangeHash) -> Self {
|
|
let bytes = h.as_bytes();
|
|
[bytes[0], bytes[1], bytes[2], bytes[3]].into()
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub(crate) struct Header {
|
|
checksum: CheckSum,
|
|
chunk_type: ChunkType,
|
|
data_len: usize,
|
|
header_size: usize,
|
|
hash: ChangeHash,
|
|
}
|
|
|
|
impl Header {
|
|
pub(crate) fn new(chunk_type: ChunkType, data: &[u8]) -> Self {
|
|
let hash = hash(chunk_type, data);
|
|
Self {
|
|
hash,
|
|
checksum: hash.checksum().into(),
|
|
data_len: data.len(),
|
|
header_size: MAGIC_BYTES.len()
|
|
+ 4 // checksum
|
|
+ 1 // chunk type
|
|
+ (ulebsize(data.len() as u64) as usize),
|
|
chunk_type,
|
|
}
|
|
}
|
|
|
|
/// Returns a header with the same checksum but with a different chunk type and data length.
|
|
/// This is primarily useful when processing compressed chunks, where the checksum is actually
|
|
/// derived from the uncompressed data.
|
|
pub(crate) fn with_data(&self, chunk_type: ChunkType, data: &[u8]) -> Header {
|
|
let hash = hash(chunk_type, data);
|
|
Self {
|
|
hash,
|
|
checksum: self.checksum,
|
|
data_len: data.len(),
|
|
header_size: MAGIC_BYTES.len()
|
|
+ 4 // checksum
|
|
+ 1 // chunk type
|
|
+ (ulebsize(data.len() as u64) as usize),
|
|
chunk_type,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn len(&self) -> usize {
|
|
self.header_size
|
|
}
|
|
|
|
pub(crate) fn write(&self, out: &mut Vec<u8>) {
|
|
out.extend(MAGIC_BYTES);
|
|
out.extend(self.checksum.bytes());
|
|
out.push(u8::from(self.chunk_type));
|
|
leb128::write::unsigned(out, self.data_len as u64).unwrap();
|
|
}
|
|
|
|
pub(crate) fn parse<E>(input: parse::Input<'_>) -> parse::ParseResult<'_, Header, E>
|
|
where
|
|
E: From<error::Header>,
|
|
{
|
|
let (
|
|
i,
|
|
parse::RangeOf {
|
|
range: header,
|
|
value: (checksum_bytes, chunk_type, chunk_len),
|
|
},
|
|
) = parse::range_of(
|
|
|i| {
|
|
let (i, magic) = parse::take4(i)?;
|
|
if magic != MAGIC_BYTES {
|
|
return Err(parse::ParseError::Error(E::from(
|
|
error::Header::InvalidMagicBytes,
|
|
)));
|
|
}
|
|
let (i, checksum_bytes) = parse::take4(i)?;
|
|
let (i, raw_chunk_type) = parse::take1(i)?;
|
|
let chunk_type: ChunkType = raw_chunk_type.try_into().map_err(|_| {
|
|
parse::ParseError::Error(E::from(error::Header::UnknownChunkType(
|
|
raw_chunk_type,
|
|
)))
|
|
})?;
|
|
let (i, chunk_len) = parse::leb128_u64(i).map_err(|e| e.lift())?;
|
|
Ok((i, (checksum_bytes, chunk_type, chunk_len)))
|
|
},
|
|
input,
|
|
)?;
|
|
|
|
let (_, data) = parse::take_n(chunk_len as usize, i)?;
|
|
let hash = hash(chunk_type, data);
|
|
Ok((
|
|
i,
|
|
Header {
|
|
checksum: checksum_bytes.into(),
|
|
chunk_type,
|
|
data_len: data.len() as usize,
|
|
header_size: header.len(),
|
|
hash,
|
|
},
|
|
))
|
|
}
|
|
|
|
/// The range of the input which corresponds to the data specified by this header
|
|
pub(crate) fn data_bytes(&self) -> Range<usize> {
|
|
self.header_size..(self.header_size + self.data_len)
|
|
}
|
|
|
|
pub(crate) fn hash(&self) -> ChangeHash {
|
|
self.hash
|
|
}
|
|
|
|
pub(crate) fn checksum_valid(&self) -> bool {
|
|
CheckSum(self.hash.checksum()) == self.checksum
|
|
}
|
|
|
|
pub(crate) fn checksum(&self) -> CheckSum {
|
|
self.checksum
|
|
}
|
|
}
|
|
|
|
fn hash(typ: ChunkType, data: &[u8]) -> ChangeHash {
|
|
let mut out = vec![u8::from(typ)];
|
|
leb128::write::unsigned(&mut out, data.len() as u64).unwrap();
|
|
out.extend(data.as_ref());
|
|
let hash_result = Sha256::digest(out);
|
|
let array: [u8; 32] = hash_result.into();
|
|
ChangeHash(array)
|
|
}
|