automerge/rust/automerge/src/storage/columns/raw_column.rs
Alex Good dd3c6d1303
Move rust workspace into ./rust
After some discussion with PVH I realise that the repo structure in the
last reorg was very rust-centric. In an attempt to put each language on
a level footing move the rust code and project files into ./rust
2022-10-16 19:55:51 +01:00

263 lines
8.9 KiB
Rust

use std::{io::Read, marker::PhantomData, ops::Range};
use crate::storage::parse;
use super::{compression, ColumnSpec};
/// This is a "raw" column in the sense that it is just the column specification[1] and range. This
/// is in contrast to [`super::Column`] which is aware of composite columns such as value columns[2] and
/// group columns[3].
///
/// `RawColumn` is generally an intermediary object which is parsed into a [`super::Column`].
///
/// The type parameter `T` is a witness to whether this column is compressed. If `T:
/// compression::Uncompressed` then we have proved that this column is not compressed, otherwise it
/// may be compressed.
///
/// [1]: https://alexjg.github.io/automerge-storage-docs/#column-specifications
/// [2]: https://alexjg.github.io/automerge-storage-docs/#raw-value-columns
/// [3]: https://alexjg.github.io/automerge-storage-docs/#group-columns
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct RawColumn<T: compression::ColumnCompression> {
spec: ColumnSpec,
/// The location of the data in the column data block. Note that this range starts at the
/// beginning of the column data block - i.e. the `data` attribute of the first column in the
/// column data block will be 0 - not at the start of the chunk.
data: Range<usize>,
_phantom: PhantomData<T>,
}
impl RawColumn<compression::Uncompressed> {
pub(crate) fn new(spec: ColumnSpec, data: Range<usize>) -> Self {
Self {
spec: ColumnSpec::new(spec.id(), spec.col_type(), false),
data,
_phantom: PhantomData,
}
}
}
impl<T: compression::ColumnCompression> RawColumn<T> {
pub(crate) fn spec(&self) -> ColumnSpec {
self.spec
}
pub(crate) fn data(&self) -> Range<usize> {
self.data.clone()
}
fn compress(&self, input: &[u8], out: &mut Vec<u8>, threshold: usize) -> (ColumnSpec, usize) {
let (spec, len) = if self.data.len() < threshold || self.spec.deflate() {
out.extend(&input[self.data.clone()]);
(self.spec, self.data.len())
} else {
let mut deflater = flate2::bufread::DeflateEncoder::new(
&input[self.data.clone()],
flate2::Compression::default(),
);
//This unwrap should be okay as we're reading and writing to in memory buffers
(self.spec.deflated(), deflater.read_to_end(out).unwrap())
};
(spec, len)
}
pub(crate) fn uncompressed(&self) -> Option<RawColumn<compression::Uncompressed>> {
if self.spec.deflate() {
None
} else {
Some(RawColumn {
spec: self.spec,
data: self.data.clone(),
_phantom: PhantomData,
})
}
}
fn decompress(&self, input: &[u8], out: &mut Vec<u8>) -> (ColumnSpec, usize) {
let len = if self.spec.deflate() {
let mut inflater = flate2::bufread::DeflateDecoder::new(&input[self.data.clone()]);
inflater.read_to_end(out).unwrap()
} else {
out.extend(&input[self.data.clone()]);
self.data.len()
};
(self.spec.inflated(), len)
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct RawColumns<T: compression::ColumnCompression>(Vec<RawColumn<T>>);
impl<T: compression::ColumnCompression> RawColumns<T> {
/// Returns `Some` if no column in this set of columns is marked as compressed
pub(crate) fn uncompressed(&self) -> Option<RawColumns<compression::Uncompressed>> {
let mut result = Vec::with_capacity(self.0.len());
for col in &self.0 {
if let Some(uncomp) = col.uncompressed() {
result.push(uncomp);
} else {
return None;
}
}
Some(RawColumns(result))
}
/// Write each column in `input` represented by `self` into `out`, possibly compressing.
///
/// # Returns
/// The `RawColumns` corresponding to the data written to `out`
///
/// # Panics
/// * If any of the ranges in `self` is outside the bounds of `input`
pub(crate) fn compress(
&self,
input: &[u8],
out: &mut Vec<u8>,
threshold: usize,
) -> RawColumns<compression::Unknown> {
let mut result = Vec::with_capacity(self.0.len());
let mut start = 0;
for col in &self.0 {
let (spec, len) = col.compress(input, out, threshold);
result.push(RawColumn {
spec,
data: start..(start + len),
_phantom: PhantomData::<compression::Unknown>,
});
start += len;
}
RawColumns(result)
}
/// Read each column from `input` and write to `out`, decompressing any compressed columns
///
/// # Returns
/// The `RawColumns` corresponding to the data written to `out`
///
/// # Panics
/// * If any of the ranges in `self` is outside the bounds of `input`
pub(crate) fn uncompress(
&self,
input: &[u8],
out: &mut Vec<u8>,
) -> RawColumns<compression::Uncompressed> {
let mut result = Vec::with_capacity(self.0.len());
let mut start = 0;
for col in &self.0 {
let (spec, len) = if let Some(decomp) = col.uncompressed() {
out.extend(&input[decomp.data.clone()]);
(decomp.spec, decomp.data.len())
} else {
col.decompress(input, out)
};
result.push(RawColumn {
spec,
data: start..(start + len),
_phantom: PhantomData::<compression::Uncompressed>,
});
start += len;
}
RawColumns(result)
}
}
impl<T: compression::ColumnCompression> FromIterator<RawColumn<T>> for RawColumns<T> {
fn from_iter<U: IntoIterator<Item = RawColumn<T>>>(iter: U) -> Self {
Self(iter.into_iter().filter(|c| !c.data.is_empty()).collect())
}
}
impl FromIterator<(ColumnSpec, Range<usize>)> for RawColumns<compression::Unknown> {
fn from_iter<T: IntoIterator<Item = (ColumnSpec, Range<usize>)>>(iter: T) -> Self {
Self(
iter.into_iter()
.filter_map(|(spec, data)| {
if data.is_empty() {
None
} else {
Some(RawColumn {
spec,
data,
_phantom: PhantomData,
})
}
})
.collect(),
)
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ParseError {
#[error("columns were not in normalized order")]
NotInNormalOrder,
#[error(transparent)]
Leb128(#[from] parse::leb128::Error),
}
impl RawColumns<compression::Unknown> {
pub(crate) fn parse<E>(input: parse::Input<'_>) -> parse::ParseResult<'_, Self, E>
where
E: From<ParseError>,
{
let i = input;
let (i, num_columns) = parse::leb128_u64(i).map_err(|e| e.lift())?;
let (i, specs_and_lens) = parse::apply_n(
num_columns as usize,
parse::tuple2(
parse::map(parse::leb128_u32, ColumnSpec::from),
parse::leb128_u64,
),
)(i)
.map_err(|e| e.lift())?;
let columns: Vec<RawColumn<compression::Unknown>> = specs_and_lens
.into_iter()
.scan(0_usize, |offset, (spec, len)| {
let end = *offset + len as usize;
let data = *offset..end;
*offset = end;
Some(RawColumn {
spec,
data,
_phantom: PhantomData,
})
})
.collect::<Vec<_>>();
if !are_normal_sorted(&columns) {
return Err(parse::ParseError::Error(
ParseError::NotInNormalOrder.into(),
));
}
Ok((i, RawColumns(columns)))
}
}
impl<T: compression::ColumnCompression> RawColumns<T> {
pub(crate) fn write(&self, out: &mut Vec<u8>) -> usize {
let mut written = leb128::write::unsigned(out, self.0.len() as u64).unwrap();
for col in &self.0 {
written += leb128::write::unsigned(out, u32::from(col.spec) as u64).unwrap();
written += leb128::write::unsigned(out, col.data.len() as u64).unwrap();
}
written
}
pub(crate) fn total_column_len(&self) -> usize {
self.0.iter().map(|c| c.data.len()).sum()
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &RawColumn<T>> + '_ {
self.0.iter()
}
}
fn are_normal_sorted<T: compression::ColumnCompression>(cols: &[RawColumn<T>]) -> bool {
if cols.len() > 1 {
for (i, col) in cols[1..].iter().enumerate() {
if col.spec.normalize() < cols[i].spec.normalize() {
return false;
}
}
}
true
}