From 09df2867e1239bd720ae844374ff795942659dd7 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 13 Dec 2022 22:02:40 -0700 Subject: [PATCH 1/2] Don't panic on invalid gzip stream Before this change automerge-rs would panic if the gzip data in a raw column was invalid; after this change the error is propagated to the caller correctly. Fixes #476 --- .../src/storage/columns/raw_column.rs | 18 +++-- rust/automerge/src/storage/document.rs | 40 ++++++----- .../src/storage/document/compression.rs | 71 +++++++++++-------- rust/automerge/tests/test.rs | 14 ++++ 4 files changed, 89 insertions(+), 54 deletions(-) diff --git a/rust/automerge/src/storage/columns/raw_column.rs b/rust/automerge/src/storage/columns/raw_column.rs index 053c3c75..808b53cf 100644 --- a/rust/automerge/src/storage/columns/raw_column.rs +++ b/rust/automerge/src/storage/columns/raw_column.rs @@ -73,15 +73,19 @@ impl RawColumn { } } - fn decompress(&self, input: &[u8], out: &mut Vec) -> (ColumnSpec, usize) { + fn decompress( + &self, + input: &[u8], + out: &mut Vec, + ) -> Result<(ColumnSpec, usize), ParseError> { let len = if self.spec.deflate() { let mut inflater = flate2::bufread::DeflateDecoder::new(&input[self.data.clone()]); - inflater.read_to_end(out).unwrap() + inflater.read_to_end(out).map_err(ParseError::Deflate)? } else { out.extend(&input[self.data.clone()]); self.data.len() }; - (self.spec.inflated(), len) + Ok((self.spec.inflated(), len)) } } @@ -140,7 +144,7 @@ impl RawColumns { &self, input: &[u8], out: &mut Vec, - ) -> RawColumns { + ) -> Result, ParseError> { let mut result = Vec::with_capacity(self.0.len()); let mut start = 0; for col in &self.0 { @@ -148,7 +152,7 @@ impl RawColumns { out.extend(&input[decomp.data.clone()]); (decomp.spec, decomp.data.len()) } else { - col.decompress(input, out) + col.decompress(input, out)? }; result.push(RawColumn { spec, @@ -157,7 +161,7 @@ impl RawColumns { }); start += len; } - RawColumns(result) + Ok(RawColumns(result)) } } @@ -193,6 +197,8 @@ pub(crate) enum ParseError { NotInNormalOrder, #[error(transparent)] Leb128(#[from] parse::leb128::Error), + #[error(transparent)] + Deflate(#[from] std::io::Error), } impl RawColumns { diff --git a/rust/automerge/src/storage/document.rs b/rust/automerge/src/storage/document.rs index 500fbe85..a3993b97 100644 --- a/rust/automerge/src/storage/document.rs +++ b/rust/automerge/src/storage/document.rs @@ -173,7 +173,8 @@ impl<'a> Document<'a> { raw_columns: ops_meta, }, extra_args: (), - }); + }) + .map_err(|e| parse::ParseError::Error(ParseError::RawColumns(e)))?; let ops_layout = Columns::parse(op_bytes.len(), ops.iter()).map_err(|e| { parse::ParseError::Error(ParseError::BadColumnLayout { @@ -271,23 +272,26 @@ impl<'a> Document<'a> { let change_bytes = shift_range(change_start..change_end, header.len()); let compressed_bytes = if let CompressConfig::Threshold(threshold) = compress { - let compressed = Cow::Owned(compression::compress(compression::Args { - prefix: prefix_len + header.len(), - suffix: suffix_start + header.len(), - ops: compression::Cols { - raw_columns: ops_meta.raw_columns(), - data: op_bytes.clone(), - }, - changes: compression::Cols { - raw_columns: change_meta.raw_columns(), - data: change_bytes.clone(), - }, - original: Cow::Borrowed(&bytes), - extra_args: compression::CompressArgs { - threshold, - original_header_len: header_len, - }, - })); + let compressed = Cow::Owned( + compression::compress(compression::Args { + prefix: prefix_len + header.len(), + suffix: suffix_start + header.len(), + ops: compression::Cols { + raw_columns: ops_meta.raw_columns(), + data: op_bytes.clone(), + }, + changes: compression::Cols { + raw_columns: change_meta.raw_columns(), + data: change_bytes.clone(), + }, + original: Cow::Borrowed(&bytes), + extra_args: compression::CompressArgs { + threshold, + original_header_len: header_len, + }, + }) + .unwrap(), // unwrap should be ok, error is only returned from decompress() + ); Some(compressed) } else { None diff --git a/rust/automerge/src/storage/document/compression.rs b/rust/automerge/src/storage/document/compression.rs index f7daa127..dfaff0df 100644 --- a/rust/automerge/src/storage/document/compression.rs +++ b/rust/automerge/src/storage/document/compression.rs @@ -1,6 +1,9 @@ use std::{borrow::Cow, ops::Range}; -use crate::storage::{columns::compression, shift_range, ChunkType, Header, RawColumns}; +use crate::storage::{ + columns::{compression, raw_column}, + shift_range, ChunkType, Header, RawColumns, +}; pub(super) struct Args<'a, T: compression::ColumnCompression, DirArgs> { /// The original data of the entire document chunk (compressed or uncompressed) @@ -23,40 +26,46 @@ pub(super) struct CompressArgs { } /// Compress a document chunk returning the compressed bytes -pub(super) fn compress<'a>(args: Args<'a, compression::Uncompressed, CompressArgs>) -> Vec { +pub(super) fn compress<'a>( + args: Args<'a, compression::Uncompressed, CompressArgs>, +) -> Result, raw_column::ParseError> { let header_len = args.extra_args.original_header_len; let threshold = args.extra_args.threshold; - Compression::<'a, Compressing, _>::new( + Ok(Compression::<'a, Compressing, _>::new( args, Compressing { threshold, header_len, }, ) - .changes() - .ops() + .changes()? + .ops()? .write_data() - .finish() + .finish()) } -pub(super) fn decompress<'a>(args: Args<'a, compression::Unknown, ()>) -> Decompressed<'a> { +pub(super) fn decompress<'a>( + args: Args<'a, compression::Unknown, ()>, +) -> Result, raw_column::ParseError> { match ( args.changes.raw_columns.uncompressed(), args.ops.raw_columns.uncompressed(), ) { - (Some(changes), Some(ops)) => Decompressed { + (Some(changes), Some(ops)) => Ok(Decompressed { changes, ops, compressed: None, uncompressed: args.original, change_bytes: args.changes.data, op_bytes: args.ops.data, - }, - _ => Compression::<'a, Decompressing, _>::new(args, Decompressing) - .changes() - .ops() - .write_data() - .finish(), + }), + _ => Ok( + Compression::<'a, Decompressing, _>::new(args, Decompressing) + .changes()? + .ops()? + .write_data() + .finish(), + ), } } @@ -108,7 +117,7 @@ trait Direction: std::fmt::Debug { input: &[u8], out: &mut Vec, meta_out: &mut Vec, - ) -> Cols; + ) -> Result, raw_column::ParseError>; } #[derive(Debug)] struct Compressing { @@ -127,16 +136,16 @@ impl Direction for Compressing { input: &[u8], out: &mut Vec, meta_out: &mut Vec, - ) -> Cols { + ) -> Result, raw_column::ParseError> { let start = out.len(); let raw_columns = cols .raw_columns .compress(&input[cols.data.clone()], out, self.threshold); raw_columns.write(meta_out); - Cols { + Ok(Cols { data: start..out.len(), raw_columns, - } + }) } } @@ -154,14 +163,16 @@ impl Direction for Decompressing { input: &[u8], out: &mut Vec, meta_out: &mut Vec, - ) -> Cols { + ) -> Result, raw_column::ParseError> { let start = out.len(); - let raw_columns = cols.raw_columns.uncompress(&input[cols.data.clone()], out); + let raw_columns = cols + .raw_columns + .uncompress(&input[cols.data.clone()], out)?; raw_columns.write(meta_out); - Cols { + Ok(Cols { data: start..out.len(), raw_columns, - } + }) } } @@ -233,7 +244,7 @@ impl<'a, D: Direction> Compression<'a, D, Starting> { } impl<'a, D: Direction> Compression<'a, D, Starting> { - fn changes(self) -> Compression<'a, D, Changes> { + fn changes(self) -> Result>, raw_column::ParseError> { let Starting { mut data_out, mut meta_out, @@ -243,8 +254,8 @@ impl<'a, D: Direction> Compression<'a, D, Starting> { &self.args.original, &mut data_out, &mut meta_out, - ); - Compression { + )?; + Ok(Compression { args: self.args, direction: self.direction, state: Changes { @@ -252,12 +263,12 @@ impl<'a, D: Direction> Compression<'a, D, Starting> { meta_out, data_out, }, - } + }) } } impl<'a, D: Direction> Compression<'a, D, Changes> { - fn ops(self) -> Compression<'a, D, ChangesAndOps> { + fn ops(self) -> Result>, raw_column::ParseError> { let Changes { change_cols, mut meta_out, @@ -268,8 +279,8 @@ impl<'a, D: Direction> Compression<'a, D, Changes> { &self.args.original, &mut data_out, &mut meta_out, - ); - Compression { + )?; + Ok(Compression { args: self.args, direction: self.direction, state: ChangesAndOps { @@ -278,7 +289,7 @@ impl<'a, D: Direction> Compression<'a, D, Changes> { meta_out, data_out, }, - } + }) } } diff --git a/rust/automerge/tests/test.rs b/rust/automerge/tests/test.rs index 876acb74..c1b653d3 100644 --- a/rust/automerge/tests/test.rs +++ b/rust/automerge/tests/test.rs @@ -1397,3 +1397,17 @@ fn ops_on_wrong_objets() -> Result<(), AutomergeError> { assert_eq!(e6, Err(AutomergeError::InvalidOp(ObjType::Text))); Ok(()) } + +#[test] +fn invalid_deflate_stream() { + let bytes: [u8; 123] = [ + 133, 111, 74, 131, 48, 48, 48, 48, 0, 113, 1, 16, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 1, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 6, 1, 2, 3, 2, 32, 2, 48, + 2, 49, 2, 49, 2, 8, 32, 4, 33, 2, 48, 2, 49, 1, 49, 2, 57, 2, 87, 3, 128, 1, 2, 127, 0, + 127, 1, 127, 1, 127, 0, 127, 0, 127, 7, 127, 2, 102, 122, 127, 0, 127, 1, 1, 127, 1, 127, + 54, 239, 191, 189, 127, 0, 0, + ]; + + assert!(Automerge::load(&bytes).is_err()); +} From 84b8cffbfa50976ce925b01c53b6de2563bb1900 Mon Sep 17 00:00:00 2001 From: Alex Good Date: Wed, 14 Dec 2022 11:13:42 +0000 Subject: [PATCH 2/2] Add an associated error type to Direction --- rust/automerge/src/storage/document.rs | 37 +++++++-------- .../src/storage/document/compression.rs | 45 +++++++++++-------- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/rust/automerge/src/storage/document.rs b/rust/automerge/src/storage/document.rs index a3993b97..ecef0bfd 100644 --- a/rust/automerge/src/storage/document.rs +++ b/rust/automerge/src/storage/document.rs @@ -272,26 +272,23 @@ impl<'a> Document<'a> { let change_bytes = shift_range(change_start..change_end, header.len()); let compressed_bytes = if let CompressConfig::Threshold(threshold) = compress { - let compressed = Cow::Owned( - compression::compress(compression::Args { - prefix: prefix_len + header.len(), - suffix: suffix_start + header.len(), - ops: compression::Cols { - raw_columns: ops_meta.raw_columns(), - data: op_bytes.clone(), - }, - changes: compression::Cols { - raw_columns: change_meta.raw_columns(), - data: change_bytes.clone(), - }, - original: Cow::Borrowed(&bytes), - extra_args: compression::CompressArgs { - threshold, - original_header_len: header_len, - }, - }) - .unwrap(), // unwrap should be ok, error is only returned from decompress() - ); + let compressed = Cow::Owned(compression::compress(compression::Args { + prefix: prefix_len + header.len(), + suffix: suffix_start + header.len(), + ops: compression::Cols { + raw_columns: ops_meta.raw_columns(), + data: op_bytes.clone(), + }, + changes: compression::Cols { + raw_columns: change_meta.raw_columns(), + data: change_bytes.clone(), + }, + original: Cow::Borrowed(&bytes), + extra_args: compression::CompressArgs { + threshold, + original_header_len: header_len, + }, + })); Some(compressed) } else { None diff --git a/rust/automerge/src/storage/document/compression.rs b/rust/automerge/src/storage/document/compression.rs index dfaff0df..2f0e96ce 100644 --- a/rust/automerge/src/storage/document/compression.rs +++ b/rust/automerge/src/storage/document/compression.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, ops::Range}; +use std::{borrow::Cow, convert::Infallible, ops::Range}; use crate::storage::{ columns::{compression, raw_column}, @@ -26,22 +26,26 @@ pub(super) struct CompressArgs { } /// Compress a document chunk returning the compressed bytes -pub(super) fn compress<'a>( - args: Args<'a, compression::Uncompressed, CompressArgs>, -) -> Result, raw_column::ParseError> { +pub(super) fn compress(args: Args<'_, compression::Uncompressed, CompressArgs>) -> Vec { let header_len = args.extra_args.original_header_len; let threshold = args.extra_args.threshold; - Ok(Compression::<'a, Compressing, _>::new( - args, - Compressing { - threshold, - header_len, - }, - ) - .changes()? - .ops()? - .write_data() - .finish()) + // Wrap in a closure so we can use `?` in the construction but still force the compiler + // to check that the error type is `Infallible` + let result: Result<_, Infallible> = (|| { + Ok(Compression::::new( + args, + Compressing { + threshold, + header_len, + }, + ) + .changes()? + .ops()? + .write_data() + .finish()) + })(); + // We just checked the error is `Infallible` so unwrap is fine + result.unwrap() } pub(super) fn decompress<'a>( @@ -103,6 +107,7 @@ pub(super) struct Cols { trait Direction: std::fmt::Debug { type Out: compression::ColumnCompression; type In: compression::ColumnCompression; + type Error; type Args; /// This method represents the (de)compression process for a direction. The arguments are: @@ -117,7 +122,7 @@ trait Direction: std::fmt::Debug { input: &[u8], out: &mut Vec, meta_out: &mut Vec, - ) -> Result, raw_column::ParseError>; + ) -> Result, Self::Error>; } #[derive(Debug)] struct Compressing { @@ -126,6 +131,7 @@ struct Compressing { } impl Direction for Compressing { + type Error = Infallible; type Out = compression::Unknown; type In = compression::Uncompressed; type Args = CompressArgs; @@ -136,7 +142,7 @@ impl Direction for Compressing { input: &[u8], out: &mut Vec, meta_out: &mut Vec, - ) -> Result, raw_column::ParseError> { + ) -> Result, Self::Error> { let start = out.len(); let raw_columns = cols .raw_columns @@ -153,6 +159,7 @@ impl Direction for Compressing { struct Decompressing; impl Direction for Decompressing { + type Error = raw_column::ParseError; type Out = compression::Uncompressed; type In = compression::Unknown; type Args = (); @@ -244,7 +251,7 @@ impl<'a, D: Direction> Compression<'a, D, Starting> { } impl<'a, D: Direction> Compression<'a, D, Starting> { - fn changes(self) -> Result>, raw_column::ParseError> { + fn changes(self) -> Result>, D::Error> { let Starting { mut data_out, mut meta_out, @@ -268,7 +275,7 @@ impl<'a, D: Direction> Compression<'a, D, Starting> { } impl<'a, D: Direction> Compression<'a, D, Changes> { - fn ops(self) -> Result>, raw_column::ParseError> { + fn ops(self) -> Result>, D::Error> { let Changes { change_cols, mut meta_out,