After some discussion with PVH I realise that the repo structure in the last reorg was very rust-centric. In an attempt to put each language on a level footing move the rust code and project files into ./rust
263 lines
8.9 KiB
Rust
263 lines
8.9 KiB
Rust
use std::{io::Read, marker::PhantomData, ops::Range};
|
|
|
|
use crate::storage::parse;
|
|
|
|
use super::{compression, ColumnSpec};
|
|
|
|
/// This is a "raw" column in the sense that it is just the column specification[1] and range. This
|
|
/// is in contrast to [`super::Column`] which is aware of composite columns such as value columns[2] and
|
|
/// group columns[3].
|
|
///
|
|
/// `RawColumn` is generally an intermediary object which is parsed into a [`super::Column`].
|
|
///
|
|
/// The type parameter `T` is a witness to whether this column is compressed. If `T:
|
|
/// compression::Uncompressed` then we have proved that this column is not compressed, otherwise it
|
|
/// may be compressed.
|
|
///
|
|
/// [1]: https://alexjg.github.io/automerge-storage-docs/#column-specifications
|
|
/// [2]: https://alexjg.github.io/automerge-storage-docs/#raw-value-columns
|
|
/// [3]: https://alexjg.github.io/automerge-storage-docs/#group-columns
|
|
#[derive(Clone, Debug, PartialEq)]
|
|
pub(crate) struct RawColumn<T: compression::ColumnCompression> {
|
|
spec: ColumnSpec,
|
|
/// The location of the data in the column data block. Note that this range starts at the
|
|
/// beginning of the column data block - i.e. the `data` attribute of the first column in the
|
|
/// column data block will be 0 - not at the start of the chunk.
|
|
data: Range<usize>,
|
|
_phantom: PhantomData<T>,
|
|
}
|
|
|
|
impl RawColumn<compression::Uncompressed> {
|
|
pub(crate) fn new(spec: ColumnSpec, data: Range<usize>) -> Self {
|
|
Self {
|
|
spec: ColumnSpec::new(spec.id(), spec.col_type(), false),
|
|
data,
|
|
_phantom: PhantomData,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T: compression::ColumnCompression> RawColumn<T> {
|
|
pub(crate) fn spec(&self) -> ColumnSpec {
|
|
self.spec
|
|
}
|
|
|
|
pub(crate) fn data(&self) -> Range<usize> {
|
|
self.data.clone()
|
|
}
|
|
|
|
fn compress(&self, input: &[u8], out: &mut Vec<u8>, threshold: usize) -> (ColumnSpec, usize) {
|
|
let (spec, len) = if self.data.len() < threshold || self.spec.deflate() {
|
|
out.extend(&input[self.data.clone()]);
|
|
(self.spec, self.data.len())
|
|
} else {
|
|
let mut deflater = flate2::bufread::DeflateEncoder::new(
|
|
&input[self.data.clone()],
|
|
flate2::Compression::default(),
|
|
);
|
|
//This unwrap should be okay as we're reading and writing to in memory buffers
|
|
(self.spec.deflated(), deflater.read_to_end(out).unwrap())
|
|
};
|
|
(spec, len)
|
|
}
|
|
|
|
pub(crate) fn uncompressed(&self) -> Option<RawColumn<compression::Uncompressed>> {
|
|
if self.spec.deflate() {
|
|
None
|
|
} else {
|
|
Some(RawColumn {
|
|
spec: self.spec,
|
|
data: self.data.clone(),
|
|
_phantom: PhantomData,
|
|
})
|
|
}
|
|
}
|
|
|
|
fn decompress(&self, input: &[u8], out: &mut Vec<u8>) -> (ColumnSpec, usize) {
|
|
let len = if self.spec.deflate() {
|
|
let mut inflater = flate2::bufread::DeflateDecoder::new(&input[self.data.clone()]);
|
|
inflater.read_to_end(out).unwrap()
|
|
} else {
|
|
out.extend(&input[self.data.clone()]);
|
|
self.data.len()
|
|
};
|
|
(self.spec.inflated(), len)
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug, PartialEq)]
|
|
pub(crate) struct RawColumns<T: compression::ColumnCompression>(Vec<RawColumn<T>>);
|
|
|
|
impl<T: compression::ColumnCompression> RawColumns<T> {
|
|
/// Returns `Some` if no column in this set of columns is marked as compressed
|
|
pub(crate) fn uncompressed(&self) -> Option<RawColumns<compression::Uncompressed>> {
|
|
let mut result = Vec::with_capacity(self.0.len());
|
|
for col in &self.0 {
|
|
if let Some(uncomp) = col.uncompressed() {
|
|
result.push(uncomp);
|
|
} else {
|
|
return None;
|
|
}
|
|
}
|
|
Some(RawColumns(result))
|
|
}
|
|
|
|
/// Write each column in `input` represented by `self` into `out`, possibly compressing.
|
|
///
|
|
/// # Returns
|
|
/// The `RawColumns` corresponding to the data written to `out`
|
|
///
|
|
/// # Panics
|
|
/// * If any of the ranges in `self` is outside the bounds of `input`
|
|
pub(crate) fn compress(
|
|
&self,
|
|
input: &[u8],
|
|
out: &mut Vec<u8>,
|
|
threshold: usize,
|
|
) -> RawColumns<compression::Unknown> {
|
|
let mut result = Vec::with_capacity(self.0.len());
|
|
let mut start = 0;
|
|
for col in &self.0 {
|
|
let (spec, len) = col.compress(input, out, threshold);
|
|
result.push(RawColumn {
|
|
spec,
|
|
data: start..(start + len),
|
|
_phantom: PhantomData::<compression::Unknown>,
|
|
});
|
|
start += len;
|
|
}
|
|
RawColumns(result)
|
|
}
|
|
|
|
/// Read each column from `input` and write to `out`, decompressing any compressed columns
|
|
///
|
|
/// # Returns
|
|
/// The `RawColumns` corresponding to the data written to `out`
|
|
///
|
|
/// # Panics
|
|
/// * If any of the ranges in `self` is outside the bounds of `input`
|
|
pub(crate) fn uncompress(
|
|
&self,
|
|
input: &[u8],
|
|
out: &mut Vec<u8>,
|
|
) -> RawColumns<compression::Uncompressed> {
|
|
let mut result = Vec::with_capacity(self.0.len());
|
|
let mut start = 0;
|
|
for col in &self.0 {
|
|
let (spec, len) = if let Some(decomp) = col.uncompressed() {
|
|
out.extend(&input[decomp.data.clone()]);
|
|
(decomp.spec, decomp.data.len())
|
|
} else {
|
|
col.decompress(input, out)
|
|
};
|
|
result.push(RawColumn {
|
|
spec,
|
|
data: start..(start + len),
|
|
_phantom: PhantomData::<compression::Uncompressed>,
|
|
});
|
|
start += len;
|
|
}
|
|
RawColumns(result)
|
|
}
|
|
}
|
|
|
|
impl<T: compression::ColumnCompression> FromIterator<RawColumn<T>> for RawColumns<T> {
|
|
fn from_iter<U: IntoIterator<Item = RawColumn<T>>>(iter: U) -> Self {
|
|
Self(iter.into_iter().filter(|c| !c.data.is_empty()).collect())
|
|
}
|
|
}
|
|
|
|
impl FromIterator<(ColumnSpec, Range<usize>)> for RawColumns<compression::Unknown> {
|
|
fn from_iter<T: IntoIterator<Item = (ColumnSpec, Range<usize>)>>(iter: T) -> Self {
|
|
Self(
|
|
iter.into_iter()
|
|
.filter_map(|(spec, data)| {
|
|
if data.is_empty() {
|
|
None
|
|
} else {
|
|
Some(RawColumn {
|
|
spec,
|
|
data,
|
|
_phantom: PhantomData,
|
|
})
|
|
}
|
|
})
|
|
.collect(),
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub(crate) enum ParseError {
|
|
#[error("columns were not in normalized order")]
|
|
NotInNormalOrder,
|
|
#[error(transparent)]
|
|
Leb128(#[from] parse::leb128::Error),
|
|
}
|
|
|
|
impl RawColumns<compression::Unknown> {
|
|
pub(crate) fn parse<E>(input: parse::Input<'_>) -> parse::ParseResult<'_, Self, E>
|
|
where
|
|
E: From<ParseError>,
|
|
{
|
|
let i = input;
|
|
let (i, num_columns) = parse::leb128_u64(i).map_err(|e| e.lift())?;
|
|
let (i, specs_and_lens) = parse::apply_n(
|
|
num_columns as usize,
|
|
parse::tuple2(
|
|
parse::map(parse::leb128_u32, ColumnSpec::from),
|
|
parse::leb128_u64,
|
|
),
|
|
)(i)
|
|
.map_err(|e| e.lift())?;
|
|
let columns: Vec<RawColumn<compression::Unknown>> = specs_and_lens
|
|
.into_iter()
|
|
.scan(0_usize, |offset, (spec, len)| {
|
|
let end = *offset + len as usize;
|
|
let data = *offset..end;
|
|
*offset = end;
|
|
Some(RawColumn {
|
|
spec,
|
|
data,
|
|
_phantom: PhantomData,
|
|
})
|
|
})
|
|
.collect::<Vec<_>>();
|
|
if !are_normal_sorted(&columns) {
|
|
return Err(parse::ParseError::Error(
|
|
ParseError::NotInNormalOrder.into(),
|
|
));
|
|
}
|
|
Ok((i, RawColumns(columns)))
|
|
}
|
|
}
|
|
|
|
impl<T: compression::ColumnCompression> RawColumns<T> {
|
|
pub(crate) fn write(&self, out: &mut Vec<u8>) -> usize {
|
|
let mut written = leb128::write::unsigned(out, self.0.len() as u64).unwrap();
|
|
for col in &self.0 {
|
|
written += leb128::write::unsigned(out, u32::from(col.spec) as u64).unwrap();
|
|
written += leb128::write::unsigned(out, col.data.len() as u64).unwrap();
|
|
}
|
|
written
|
|
}
|
|
|
|
pub(crate) fn total_column_len(&self) -> usize {
|
|
self.0.iter().map(|c| c.data.len()).sum()
|
|
}
|
|
|
|
pub(crate) fn iter(&self) -> impl Iterator<Item = &RawColumn<T>> + '_ {
|
|
self.0.iter()
|
|
}
|
|
}
|
|
|
|
fn are_normal_sorted<T: compression::ColumnCompression>(cols: &[RawColumn<T>]) -> bool {
|
|
if cols.len() > 1 {
|
|
for (i, col) in cols[1..].iter().enumerate() {
|
|
if col.spec.normalize() < cols[i].spec.normalize() {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
true
|
|
}
|