Compare commits

...

2 commits

Author SHA1 Message Date
Alex Good
84b8cffbfa
Add an associated error type to Direction 2022-12-14 11:13:42 +00:00
Conrad Irwin
09df2867e1 Don't panic on invalid gzip stream
Before this change automerge-rs would panic if the gzip data in
a raw column was invalid; after this change the error is propagated
to the caller correctly.

Fixes #476
2022-12-13 22:18:01 -07:00
4 changed files with 84 additions and 45 deletions

View file

@ -73,15 +73,19 @@ impl<T: compression::ColumnCompression> RawColumn<T> {
} }
} }
fn decompress(&self, input: &[u8], out: &mut Vec<u8>) -> (ColumnSpec, usize) { fn decompress(
&self,
input: &[u8],
out: &mut Vec<u8>,
) -> Result<(ColumnSpec, usize), ParseError> {
let len = if self.spec.deflate() { let len = if self.spec.deflate() {
let mut inflater = flate2::bufread::DeflateDecoder::new(&input[self.data.clone()]); let mut inflater = flate2::bufread::DeflateDecoder::new(&input[self.data.clone()]);
inflater.read_to_end(out).unwrap() inflater.read_to_end(out).map_err(ParseError::Deflate)?
} else { } else {
out.extend(&input[self.data.clone()]); out.extend(&input[self.data.clone()]);
self.data.len() self.data.len()
}; };
(self.spec.inflated(), len) Ok((self.spec.inflated(), len))
} }
} }
@ -140,7 +144,7 @@ impl<T: compression::ColumnCompression> RawColumns<T> {
&self, &self,
input: &[u8], input: &[u8],
out: &mut Vec<u8>, out: &mut Vec<u8>,
) -> RawColumns<compression::Uncompressed> { ) -> Result<RawColumns<compression::Uncompressed>, ParseError> {
let mut result = Vec::with_capacity(self.0.len()); let mut result = Vec::with_capacity(self.0.len());
let mut start = 0; let mut start = 0;
for col in &self.0 { for col in &self.0 {
@ -148,7 +152,7 @@ impl<T: compression::ColumnCompression> RawColumns<T> {
out.extend(&input[decomp.data.clone()]); out.extend(&input[decomp.data.clone()]);
(decomp.spec, decomp.data.len()) (decomp.spec, decomp.data.len())
} else { } else {
col.decompress(input, out) col.decompress(input, out)?
}; };
result.push(RawColumn { result.push(RawColumn {
spec, spec,
@ -157,7 +161,7 @@ impl<T: compression::ColumnCompression> RawColumns<T> {
}); });
start += len; start += len;
} }
RawColumns(result) Ok(RawColumns(result))
} }
} }
@ -193,6 +197,8 @@ pub(crate) enum ParseError {
NotInNormalOrder, NotInNormalOrder,
#[error(transparent)] #[error(transparent)]
Leb128(#[from] parse::leb128::Error), Leb128(#[from] parse::leb128::Error),
#[error(transparent)]
Deflate(#[from] std::io::Error),
} }
impl RawColumns<compression::Unknown> { impl RawColumns<compression::Unknown> {

View file

@ -173,7 +173,8 @@ impl<'a> Document<'a> {
raw_columns: ops_meta, raw_columns: ops_meta,
}, },
extra_args: (), extra_args: (),
}); })
.map_err(|e| parse::ParseError::Error(ParseError::RawColumns(e)))?;
let ops_layout = Columns::parse(op_bytes.len(), ops.iter()).map_err(|e| { let ops_layout = Columns::parse(op_bytes.len(), ops.iter()).map_err(|e| {
parse::ParseError::Error(ParseError::BadColumnLayout { parse::ParseError::Error(ParseError::BadColumnLayout {

View file

@ -1,6 +1,9 @@
use std::{borrow::Cow, ops::Range}; use std::{borrow::Cow, convert::Infallible, ops::Range};
use crate::storage::{columns::compression, shift_range, ChunkType, Header, RawColumns}; use crate::storage::{
columns::{compression, raw_column},
shift_range, ChunkType, Header, RawColumns,
};
pub(super) struct Args<'a, T: compression::ColumnCompression, DirArgs> { pub(super) struct Args<'a, T: compression::ColumnCompression, DirArgs> {
/// The original data of the entire document chunk (compressed or uncompressed) /// The original data of the entire document chunk (compressed or uncompressed)
@ -23,40 +26,50 @@ pub(super) struct CompressArgs {
} }
/// Compress a document chunk returning the compressed bytes /// Compress a document chunk returning the compressed bytes
pub(super) fn compress<'a>(args: Args<'a, compression::Uncompressed, CompressArgs>) -> Vec<u8> { pub(super) fn compress(args: Args<'_, compression::Uncompressed, CompressArgs>) -> Vec<u8> {
let header_len = args.extra_args.original_header_len; let header_len = args.extra_args.original_header_len;
let threshold = args.extra_args.threshold; let threshold = args.extra_args.threshold;
Compression::<'a, Compressing, _>::new( // Wrap in a closure so we can use `?` in the construction but still force the compiler
// to check that the error type is `Infallible`
let result: Result<_, Infallible> = (|| {
Ok(Compression::<Compressing, _>::new(
args, args,
Compressing { Compressing {
threshold, threshold,
header_len, header_len,
}, },
) )
.changes() .changes()?
.ops() .ops()?
.write_data() .write_data()
.finish() .finish())
})();
// We just checked the error is `Infallible` so unwrap is fine
result.unwrap()
} }
pub(super) fn decompress<'a>(args: Args<'a, compression::Unknown, ()>) -> Decompressed<'a> { pub(super) fn decompress<'a>(
args: Args<'a, compression::Unknown, ()>,
) -> Result<Decompressed<'a>, raw_column::ParseError> {
match ( match (
args.changes.raw_columns.uncompressed(), args.changes.raw_columns.uncompressed(),
args.ops.raw_columns.uncompressed(), args.ops.raw_columns.uncompressed(),
) { ) {
(Some(changes), Some(ops)) => Decompressed { (Some(changes), Some(ops)) => Ok(Decompressed {
changes, changes,
ops, ops,
compressed: None, compressed: None,
uncompressed: args.original, uncompressed: args.original,
change_bytes: args.changes.data, change_bytes: args.changes.data,
op_bytes: args.ops.data, op_bytes: args.ops.data,
}, }),
_ => Compression::<'a, Decompressing, _>::new(args, Decompressing) _ => Ok(
.changes() Compression::<'a, Decompressing, _>::new(args, Decompressing)
.ops() .changes()?
.ops()?
.write_data() .write_data()
.finish(), .finish(),
),
} }
} }
@ -94,6 +107,7 @@ pub(super) struct Cols<T: compression::ColumnCompression> {
trait Direction: std::fmt::Debug { trait Direction: std::fmt::Debug {
type Out: compression::ColumnCompression; type Out: compression::ColumnCompression;
type In: compression::ColumnCompression; type In: compression::ColumnCompression;
type Error;
type Args; type Args;
/// This method represents the (de)compression process for a direction. The arguments are: /// This method represents the (de)compression process for a direction. The arguments are:
@ -108,7 +122,7 @@ trait Direction: std::fmt::Debug {
input: &[u8], input: &[u8],
out: &mut Vec<u8>, out: &mut Vec<u8>,
meta_out: &mut Vec<u8>, meta_out: &mut Vec<u8>,
) -> Cols<Self::Out>; ) -> Result<Cols<Self::Out>, Self::Error>;
} }
#[derive(Debug)] #[derive(Debug)]
struct Compressing { struct Compressing {
@ -117,6 +131,7 @@ struct Compressing {
} }
impl Direction for Compressing { impl Direction for Compressing {
type Error = Infallible;
type Out = compression::Unknown; type Out = compression::Unknown;
type In = compression::Uncompressed; type In = compression::Uncompressed;
type Args = CompressArgs; type Args = CompressArgs;
@ -127,16 +142,16 @@ impl Direction for Compressing {
input: &[u8], input: &[u8],
out: &mut Vec<u8>, out: &mut Vec<u8>,
meta_out: &mut Vec<u8>, meta_out: &mut Vec<u8>,
) -> Cols<Self::Out> { ) -> Result<Cols<Self::Out>, Self::Error> {
let start = out.len(); let start = out.len();
let raw_columns = cols let raw_columns = cols
.raw_columns .raw_columns
.compress(&input[cols.data.clone()], out, self.threshold); .compress(&input[cols.data.clone()], out, self.threshold);
raw_columns.write(meta_out); raw_columns.write(meta_out);
Cols { Ok(Cols {
data: start..out.len(), data: start..out.len(),
raw_columns, raw_columns,
} })
} }
} }
@ -144,6 +159,7 @@ impl Direction for Compressing {
struct Decompressing; struct Decompressing;
impl Direction for Decompressing { impl Direction for Decompressing {
type Error = raw_column::ParseError;
type Out = compression::Uncompressed; type Out = compression::Uncompressed;
type In = compression::Unknown; type In = compression::Unknown;
type Args = (); type Args = ();
@ -154,14 +170,16 @@ impl Direction for Decompressing {
input: &[u8], input: &[u8],
out: &mut Vec<u8>, out: &mut Vec<u8>,
meta_out: &mut Vec<u8>, meta_out: &mut Vec<u8>,
) -> Cols<Self::Out> { ) -> Result<Cols<Self::Out>, raw_column::ParseError> {
let start = out.len(); let start = out.len();
let raw_columns = cols.raw_columns.uncompress(&input[cols.data.clone()], out); let raw_columns = cols
.raw_columns
.uncompress(&input[cols.data.clone()], out)?;
raw_columns.write(meta_out); raw_columns.write(meta_out);
Cols { Ok(Cols {
data: start..out.len(), data: start..out.len(),
raw_columns, raw_columns,
} })
} }
} }
@ -233,7 +251,7 @@ impl<'a, D: Direction> Compression<'a, D, Starting> {
} }
impl<'a, D: Direction> Compression<'a, D, Starting> { impl<'a, D: Direction> Compression<'a, D, Starting> {
fn changes(self) -> Compression<'a, D, Changes<D>> { fn changes(self) -> Result<Compression<'a, D, Changes<D>>, D::Error> {
let Starting { let Starting {
mut data_out, mut data_out,
mut meta_out, mut meta_out,
@ -243,8 +261,8 @@ impl<'a, D: Direction> Compression<'a, D, Starting> {
&self.args.original, &self.args.original,
&mut data_out, &mut data_out,
&mut meta_out, &mut meta_out,
); )?;
Compression { Ok(Compression {
args: self.args, args: self.args,
direction: self.direction, direction: self.direction,
state: Changes { state: Changes {
@ -252,12 +270,12 @@ impl<'a, D: Direction> Compression<'a, D, Starting> {
meta_out, meta_out,
data_out, data_out,
}, },
} })
} }
} }
impl<'a, D: Direction> Compression<'a, D, Changes<D>> { impl<'a, D: Direction> Compression<'a, D, Changes<D>> {
fn ops(self) -> Compression<'a, D, ChangesAndOps<D>> { fn ops(self) -> Result<Compression<'a, D, ChangesAndOps<D>>, D::Error> {
let Changes { let Changes {
change_cols, change_cols,
mut meta_out, mut meta_out,
@ -268,8 +286,8 @@ impl<'a, D: Direction> Compression<'a, D, Changes<D>> {
&self.args.original, &self.args.original,
&mut data_out, &mut data_out,
&mut meta_out, &mut meta_out,
); )?;
Compression { Ok(Compression {
args: self.args, args: self.args,
direction: self.direction, direction: self.direction,
state: ChangesAndOps { state: ChangesAndOps {
@ -278,7 +296,7 @@ impl<'a, D: Direction> Compression<'a, D, Changes<D>> {
meta_out, meta_out,
data_out, data_out,
}, },
} })
} }
} }

View file

@ -1397,3 +1397,17 @@ fn ops_on_wrong_objets() -> Result<(), AutomergeError> {
assert_eq!(e6, Err(AutomergeError::InvalidOp(ObjType::Text))); assert_eq!(e6, Err(AutomergeError::InvalidOp(ObjType::Text)));
Ok(()) Ok(())
} }
#[test]
fn invalid_deflate_stream() {
let bytes: [u8; 123] = [
133, 111, 74, 131, 48, 48, 48, 48, 0, 113, 1, 16, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
48, 48, 48, 48, 48, 48, 1, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 6, 1, 2, 3, 2, 32, 2, 48,
2, 49, 2, 49, 2, 8, 32, 4, 33, 2, 48, 2, 49, 1, 49, 2, 57, 2, 87, 3, 128, 1, 2, 127, 0,
127, 1, 127, 1, 127, 0, 127, 0, 127, 7, 127, 2, 102, 122, 127, 0, 127, 1, 1, 127, 1, 127,
54, 239, 191, 189, 127, 0, 0,
];
assert!(Automerge::load(&bytes).is_err());
}