After some discussion with PVH I realise that the repo structure in the last reorg was very rust-centric. In an attempt to put each language on a level footing move the rust code and project files into ./rust
338 lines
10 KiB
Rust
338 lines
10 KiB
Rust
use std::{borrow::Cow, ops::Range};
|
|
|
|
use crate::storage::{columns::compression, shift_range, ChunkType, Header, RawColumns};
|
|
|
|
pub(super) struct Args<'a, T: compression::ColumnCompression, DirArgs> {
|
|
/// The original data of the entire document chunk (compressed or uncompressed)
|
|
pub(super) original: Cow<'a, [u8]>,
|
|
/// The number of bytes in the original before the beginning of the change column metadata
|
|
pub(super) prefix: usize,
|
|
/// The offset in the original after the end of the ops column data
|
|
pub(super) suffix: usize,
|
|
/// The column data for the changes
|
|
pub(super) changes: Cols<T>,
|
|
/// The column data for the ops
|
|
pub(super) ops: Cols<T>,
|
|
/// Additional arguments specific to the direction (compression or uncompression)
|
|
pub(super) extra_args: DirArgs,
|
|
}
|
|
|
|
pub(super) struct CompressArgs {
|
|
pub(super) threshold: usize,
|
|
pub(super) original_header_len: usize,
|
|
}
|
|
|
|
/// Compress a document chunk returning the compressed bytes
|
|
pub(super) fn compress<'a>(args: Args<'a, compression::Uncompressed, CompressArgs>) -> Vec<u8> {
|
|
let header_len = args.extra_args.original_header_len;
|
|
let threshold = args.extra_args.threshold;
|
|
Compression::<'a, Compressing, _>::new(
|
|
args,
|
|
Compressing {
|
|
threshold,
|
|
header_len,
|
|
},
|
|
)
|
|
.changes()
|
|
.ops()
|
|
.write_data()
|
|
.finish()
|
|
}
|
|
|
|
pub(super) fn decompress<'a>(args: Args<'a, compression::Unknown, ()>) -> Decompressed<'a> {
|
|
match (
|
|
args.changes.raw_columns.uncompressed(),
|
|
args.ops.raw_columns.uncompressed(),
|
|
) {
|
|
(Some(changes), Some(ops)) => Decompressed {
|
|
changes,
|
|
ops,
|
|
compressed: None,
|
|
uncompressed: args.original,
|
|
change_bytes: args.changes.data,
|
|
op_bytes: args.ops.data,
|
|
},
|
|
_ => Compression::<'a, Decompressing, _>::new(args, Decompressing)
|
|
.changes()
|
|
.ops()
|
|
.write_data()
|
|
.finish(),
|
|
}
|
|
}
|
|
|
|
pub(super) struct Decompressed<'a> {
|
|
/// The original compressed data, if there was any
|
|
pub(super) compressed: Option<Cow<'a, [u8]>>,
|
|
/// The final uncompressed data
|
|
pub(super) uncompressed: Cow<'a, [u8]>,
|
|
/// The ops column metadata
|
|
pub(super) ops: RawColumns<compression::Uncompressed>,
|
|
/// The change column metadata
|
|
pub(super) changes: RawColumns<compression::Uncompressed>,
|
|
/// The location of the change column data in the uncompressed data
|
|
pub(super) change_bytes: Range<usize>,
|
|
/// The location of the op column data in the uncompressed data
|
|
pub(super) op_bytes: Range<usize>,
|
|
}
|
|
|
|
struct Compression<'a, D: Direction, S: CompressionState> {
|
|
args: Args<'a, D::In, D::Args>,
|
|
state: S,
|
|
direction: D,
|
|
}
|
|
|
|
/// Some columns in the original data
|
|
pub(super) struct Cols<T: compression::ColumnCompression> {
|
|
/// The metadata for these columns
|
|
pub(super) raw_columns: RawColumns<T>,
|
|
/// The location in the original chunk of the data for these columns
|
|
pub(super) data: Range<usize>,
|
|
}
|
|
|
|
// Compression and decompression involve almost the same steps in either direction. This trait
|
|
// encapsulates that.
|
|
trait Direction: std::fmt::Debug {
|
|
type Out: compression::ColumnCompression;
|
|
type In: compression::ColumnCompression;
|
|
type Args;
|
|
|
|
/// This method represents the (de)compression process for a direction. The arguments are:
|
|
///
|
|
/// * cols - The columns we are processing
|
|
/// * input - the entire document chunk
|
|
/// * out - the vector to place the processed columns in
|
|
/// * meta_out - the vector to place processed column metadata in
|
|
fn process(
|
|
&self,
|
|
cols: &Cols<Self::In>,
|
|
input: &[u8],
|
|
out: &mut Vec<u8>,
|
|
meta_out: &mut Vec<u8>,
|
|
) -> Cols<Self::Out>;
|
|
}
|
|
#[derive(Debug)]
|
|
struct Compressing {
|
|
threshold: usize,
|
|
header_len: usize,
|
|
}
|
|
|
|
impl Direction for Compressing {
|
|
type Out = compression::Unknown;
|
|
type In = compression::Uncompressed;
|
|
type Args = CompressArgs;
|
|
|
|
fn process(
|
|
&self,
|
|
cols: &Cols<Self::In>,
|
|
input: &[u8],
|
|
out: &mut Vec<u8>,
|
|
meta_out: &mut Vec<u8>,
|
|
) -> Cols<Self::Out> {
|
|
let start = out.len();
|
|
let raw_columns = cols
|
|
.raw_columns
|
|
.compress(&input[cols.data.clone()], out, self.threshold);
|
|
raw_columns.write(meta_out);
|
|
Cols {
|
|
data: start..out.len(),
|
|
raw_columns,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct Decompressing;
|
|
|
|
impl Direction for Decompressing {
|
|
type Out = compression::Uncompressed;
|
|
type In = compression::Unknown;
|
|
type Args = ();
|
|
|
|
fn process(
|
|
&self,
|
|
cols: &Cols<Self::In>,
|
|
input: &[u8],
|
|
out: &mut Vec<u8>,
|
|
meta_out: &mut Vec<u8>,
|
|
) -> Cols<Self::Out> {
|
|
let start = out.len();
|
|
let raw_columns = cols.raw_columns.uncompress(&input[cols.data.clone()], out);
|
|
raw_columns.write(meta_out);
|
|
Cols {
|
|
data: start..out.len(),
|
|
raw_columns,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Somewhat absurdly I (alex) kept getting the order of writing ops and changes wrong as well as
|
|
// the order that column metadata vs data should be written in. This is a type state to get the
|
|
// compiler to enforce that things are done in the right order.
|
|
trait CompressionState {}
|
|
impl CompressionState for Starting {}
|
|
impl<D: Direction> CompressionState for Changes<D> {}
|
|
impl<D: Direction> CompressionState for ChangesAndOps<D> {}
|
|
impl<D: Direction> CompressionState for Finished<D> {}
|
|
|
|
/// We haven't done any processing yet
|
|
struct Starting {
|
|
/// The vector to write column data to
|
|
data_out: Vec<u8>,
|
|
/// The vector to write column metadata to
|
|
meta_out: Vec<u8>,
|
|
}
|
|
|
|
/// We've processed the changes columns
|
|
struct Changes<D: Direction> {
|
|
/// The `Cols` for the processed change columns
|
|
change_cols: Cols<D::Out>,
|
|
/// The vector to write column metadata to
|
|
meta_out: Vec<u8>,
|
|
/// The vector to write column data to
|
|
data_out: Vec<u8>,
|
|
}
|
|
|
|
/// We've processed the ops columns
|
|
struct ChangesAndOps<D: Direction> {
|
|
/// The `Cols` for the processed change columns
|
|
change_cols: Cols<D::Out>,
|
|
/// The `Cols` for the processed op columns
|
|
ops_cols: Cols<D::Out>,
|
|
/// The vector to write column metadata to
|
|
meta_out: Vec<u8>,
|
|
/// The vector to write column data to
|
|
data_out: Vec<u8>,
|
|
}
|
|
|
|
/// We've written the column metadata and the op metadata for changes and ops to the output buffer
|
|
/// and added the prefix and suffix from the args.
|
|
struct Finished<D: Direction> {
|
|
/// The `Cols` for the processed change columns
|
|
change_cols: Cols<D::Out>,
|
|
/// The `Cols` for the processed op columns
|
|
ops_cols: Cols<D::Out>,
|
|
/// The start of the change column metadata in the processed chunk
|
|
data_start: usize,
|
|
/// The processed chunk
|
|
out: Vec<u8>,
|
|
}
|
|
|
|
impl<'a, D: Direction> Compression<'a, D, Starting> {
|
|
fn new(args: Args<'a, D::In, D::Args>, direction: D) -> Compression<'a, D, Starting> {
|
|
let mut meta_out = Vec::with_capacity(args.original.len() * 2);
|
|
meta_out.extend(&args.original[..args.prefix]);
|
|
Compression {
|
|
args,
|
|
direction,
|
|
state: Starting {
|
|
meta_out,
|
|
data_out: Vec::new(),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, D: Direction> Compression<'a, D, Starting> {
|
|
fn changes(self) -> Compression<'a, D, Changes<D>> {
|
|
let Starting {
|
|
mut data_out,
|
|
mut meta_out,
|
|
} = self.state;
|
|
let change_cols = self.direction.process(
|
|
&self.args.changes,
|
|
&self.args.original,
|
|
&mut data_out,
|
|
&mut meta_out,
|
|
);
|
|
Compression {
|
|
args: self.args,
|
|
direction: self.direction,
|
|
state: Changes {
|
|
change_cols,
|
|
meta_out,
|
|
data_out,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, D: Direction> Compression<'a, D, Changes<D>> {
|
|
fn ops(self) -> Compression<'a, D, ChangesAndOps<D>> {
|
|
let Changes {
|
|
change_cols,
|
|
mut meta_out,
|
|
mut data_out,
|
|
} = self.state;
|
|
let ops_cols = self.direction.process(
|
|
&self.args.ops,
|
|
&self.args.original,
|
|
&mut data_out,
|
|
&mut meta_out,
|
|
);
|
|
Compression {
|
|
args: self.args,
|
|
direction: self.direction,
|
|
state: ChangesAndOps {
|
|
change_cols,
|
|
ops_cols,
|
|
meta_out,
|
|
data_out,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, D: Direction> Compression<'a, D, ChangesAndOps<D>> {
|
|
fn write_data(self) -> Compression<'a, D, Finished<D>> {
|
|
let ChangesAndOps {
|
|
data_out,
|
|
mut meta_out,
|
|
change_cols,
|
|
ops_cols,
|
|
} = self.state;
|
|
let data_start = meta_out.len();
|
|
meta_out.extend(&data_out);
|
|
meta_out.extend(&self.args.original[self.args.suffix..]);
|
|
Compression {
|
|
args: self.args,
|
|
direction: self.direction,
|
|
state: Finished {
|
|
ops_cols,
|
|
change_cols,
|
|
out: meta_out,
|
|
data_start,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a> Compression<'a, Decompressing, Finished<Decompressing>> {
|
|
fn finish(self) -> Decompressed<'a> {
|
|
let Finished {
|
|
change_cols,
|
|
ops_cols,
|
|
data_start,
|
|
out,
|
|
} = self.state;
|
|
Decompressed {
|
|
ops: ops_cols.raw_columns,
|
|
changes: change_cols.raw_columns,
|
|
uncompressed: Cow::Owned(out),
|
|
compressed: Some(self.args.original),
|
|
change_bytes: shift_range(change_cols.data, data_start),
|
|
op_bytes: shift_range(ops_cols.data, data_start),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a> Compression<'a, Compressing, Finished<Compressing>> {
|
|
fn finish(self) -> Vec<u8> {
|
|
let Finished { out, .. } = self.state;
|
|
let headerless = &out[self.direction.header_len..];
|
|
let header = Header::new(ChunkType::Document, headerless);
|
|
let mut result = Vec::with_capacity(header.len() + out.len());
|
|
header.write(&mut result);
|
|
result.extend(headerless);
|
|
result
|
|
}
|
|
}
|