Speed up loading by generating clocks on demand

Context: currently we store a mapping from ChangeHash -> Clock, where
`Clock` is the set of (ActorId, (Sequence number, max Op)) pairs derived
from the given change and it's dependencies. This clock is used to
determine what operations are visible at a given set of heads.

Problem: populating this mapping for documents with large histories
containing many actors can be very slow as for each change we have to
allocate and merge a bunch of hashmaps.

Solution: instead of creating the clocks on load, create an adjacency
list based representation of the change graph and then derive the clock
from this graph when it is needed. Traversing even large graphs is still
almost as fast as looking up the clock in a hashmap.
This commit is contained in:
Alex Good 2023-02-02 13:28:22 +00:00 committed by alexjg
parent 1e33c9d9e0
commit 13a775ed9a
6 changed files with 392 additions and 141 deletions

View file

@ -4,8 +4,7 @@ use std::fmt::Debug;
use std::num::NonZeroU64;
use std::ops::RangeBounds;
use crate::clock::ClockData;
use crate::clocks::Clocks;
use crate::change_graph::ChangeGraph;
use crate::columnar::Key as EncodedKey;
use crate::exid::ExId;
use crate::keys::Keys;
@ -87,8 +86,8 @@ pub struct Automerge {
history: Vec<Change>,
/// Mapping from change hash to index into the history list.
history_index: HashMap<ChangeHash, usize>,
/// Mapping from change hash to vector clock at this state.
clocks: HashMap<ChangeHash, Clock>,
/// Graph of changes
change_graph: ChangeGraph,
/// Mapping from actor index to list of seqs seen for them.
states: HashMap<usize, Vec<usize>>,
/// Current dependencies of this document (heads hashes).
@ -111,7 +110,7 @@ impl Automerge {
queue: vec![],
history: vec![],
history_index: HashMap::new(),
clocks: HashMap::new(),
change_graph: ChangeGraph::new(),
states: HashMap::new(),
ops: Default::default(),
deps: Default::default(),
@ -477,14 +476,14 @@ impl Automerge {
.map_err(|e| load::Error::InflateDocument(Box::new(e)))?;
let mut hashes_by_index = HashMap::new();
let mut actor_to_history: HashMap<usize, Vec<usize>> = HashMap::new();
let mut clocks = Clocks::new();
let mut change_graph = ChangeGraph::new();
for (index, change) in changes.iter().enumerate() {
// SAFETY: This should be fine because we just constructed an opset containing
// all the changes
let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap();
actor_to_history.entry(actor_index).or_default().push(index);
hashes_by_index.insert(index, change.hash());
clocks.add_change(change, actor_index)?;
change_graph.add_change(change, actor_index)?;
}
let history_index = hashes_by_index.into_iter().map(|(k, v)| (v, k)).collect();
Self {
@ -492,7 +491,7 @@ impl Automerge {
history: changes,
history_index,
states: actor_to_history,
clocks: clocks.into(),
change_graph,
ops: op_set,
deps: heads.into_iter().collect(),
saved: Default::default(),
@ -824,16 +823,8 @@ impl Automerge {
.filter(|hash| self.history_index.contains_key(hash))
.copied()
.collect::<Vec<_>>();
let heads_clock = self.clock_at(&heads)?;
// keep the hashes that are concurrent or after the heads
changes.retain(|hash| {
self.clocks
.get(hash)
.unwrap()
.partial_cmp(&heads_clock)
.map_or(true, |o| o == Ordering::Greater)
});
self.change_graph.remove_ancestors(changes, &heads);
Ok(())
}
@ -841,7 +832,7 @@ impl Automerge {
/// Get the changes since `have_deps` in this document using a clock internally.
fn get_changes_clock(&self, have_deps: &[ChangeHash]) -> Result<Vec<&Change>, AutomergeError> {
// get the clock for the given deps
let clock = self.clock_at(have_deps)?;
let clock = self.clock_at(have_deps);
// get the documents current clock
@ -875,26 +866,8 @@ impl Automerge {
.find(|c| c.actor_id() == self.get_actor());
}
fn clock_at(&self, heads: &[ChangeHash]) -> Result<Clock, AutomergeError> {
if let Some(first_hash) = heads.first() {
let mut clock = self
.clocks
.get(first_hash)
.ok_or(AutomergeError::MissingHash(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self
.clocks
.get(hash)
.ok_or(AutomergeError::MissingHash(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
fn clock_at(&self, heads: &[ChangeHash]) -> Clock {
self.change_graph.clock_for_heads(heads)
}
fn get_hash(&self, actor: usize, seq: u64) -> Result<ChangeHash, AutomergeError> {
@ -920,22 +893,9 @@ impl Automerge {
.push(history_index);
self.history_index.insert(change.hash(), history_index);
let mut clock = Clock::new();
for hash in change.deps() {
let c = self
.clocks
.get(hash)
.expect("Change's deps should already be in the document");
clock.merge(c);
}
clock.include(
actor_index,
ClockData {
max_op: change.max_op(),
seq: change.seq(),
},
);
self.clocks.insert(change.hash(), clock);
self.change_graph
.add_change(&change, actor_index)
.expect("Change's deps should already be in the document");
self.history_index.insert(change.hash(), history_index);
self.history.push(change);
@ -1197,9 +1157,8 @@ impl ReadDoc for Automerge {
fn keys_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> KeysAt<'_, '_> {
if let Ok((obj, _)) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return KeysAt::new(self, self.ops.keys_at(obj, clock));
}
let clock = self.clock_at(heads);
return KeysAt::new(self, self.ops.keys_at(obj, clock));
}
KeysAt::new(self, None)
}
@ -1223,10 +1182,9 @@ impl ReadDoc for Automerge {
heads: &[ChangeHash],
) -> MapRangeAt<'_, R> {
if let Ok((obj, _)) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
let iter_range = self.ops.map_range_at(obj, range, clock);
return MapRangeAt::new(self, iter_range);
}
let clock = self.clock_at(heads);
let iter_range = self.ops.map_range_at(obj, range, clock);
return MapRangeAt::new(self, iter_range);
}
MapRangeAt::new(self, None)
}
@ -1250,10 +1208,9 @@ impl ReadDoc for Automerge {
heads: &[ChangeHash],
) -> ListRangeAt<'_, R> {
if let Ok((obj, _)) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
let iter_range = self.ops.list_range_at(obj, range, clock);
return ListRangeAt::new(self, iter_range);
}
let clock = self.clock_at(heads);
let iter_range = self.ops.list_range_at(obj, range, clock);
return ListRangeAt::new(self, iter_range);
}
ListRangeAt::new(self, None)
}
@ -1272,20 +1229,20 @@ impl ReadDoc for Automerge {
fn values_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> Values<'_> {
if let Ok((obj, obj_type)) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return match obj_type {
ObjType::Map | ObjType::Table => {
let iter_range = self.ops.map_range_at(obj, .., clock);
Values::new(self, iter_range)
}
ObjType::List | ObjType::Text => {
let iter_range = self.ops.list_range_at(obj, .., clock);
Values::new(self, iter_range)
}
};
let clock = self.clock_at(heads);
match obj_type {
ObjType::Map | ObjType::Table => {
let iter_range = self.ops.map_range_at(obj, .., clock);
Values::new(self, iter_range)
}
ObjType::List | ObjType::Text => {
let iter_range = self.ops.list_range_at(obj, .., clock);
Values::new(self, iter_range)
}
}
} else {
Values::empty(self)
}
Values::empty(self)
}
fn length<O: AsRef<ExId>>(&self, obj: O) -> usize {
@ -1303,18 +1260,18 @@ impl ReadDoc for Automerge {
fn length_at<O: AsRef<ExId>>(&self, obj: O, heads: &[ChangeHash]) -> usize {
if let Ok((inner_obj, obj_type)) = self.exid_to_obj(obj.as_ref()) {
if let Ok(clock) = self.clock_at(heads) {
return if obj_type == ObjType::Map || obj_type == ObjType::Table {
self.keys_at(obj, heads).count()
} else {
let encoding = ListEncoding::new(obj_type, self.text_encoding);
self.ops
.search(&inner_obj, query::LenAt::new(clock, encoding))
.len
};
let clock = self.clock_at(heads);
if obj_type == ObjType::Map || obj_type == ObjType::Table {
self.keys_at(obj, heads).count()
} else {
let encoding = ListEncoding::new(obj_type, self.text_encoding);
self.ops
.search(&inner_obj, query::LenAt::new(clock, encoding))
.len
}
} else {
0
}
0
}
fn object_type<O: AsRef<ExId>>(&self, obj: O) -> Result<ObjType, AutomergeError> {
@ -1338,7 +1295,7 @@ impl ReadDoc for Automerge {
heads: &[ChangeHash],
) -> Result<String, AutomergeError> {
let obj = self.exid_to_obj(obj.as_ref())?.0;
let clock = self.clock_at(heads)?;
let clock = self.clock_at(heads);
let query = self.ops.search(&obj, query::ListValsAt::new(clock));
let mut buffer = String::new();
for q in &query.ops {
@ -1413,7 +1370,7 @@ impl ReadDoc for Automerge {
) -> Result<Vec<(Value<'_>, ExId)>, AutomergeError> {
let prop = prop.into();
let obj = self.exid_to_obj(obj.as_ref())?.0;
let clock = self.clock_at(heads)?;
let clock = self.clock_at(heads);
let result = match prop {
Prop::Map(p) => {
let prop = self.ops.m.props.lookup(&p);

View file

@ -0,0 +1,344 @@
use std::collections::{BTreeMap, BTreeSet};
use crate::{
clock::{Clock, ClockData},
Change, ChangeHash,
};
/// The graph of changes
///
/// This is a sort of adjacency list based representation, except that instead of using linked
/// lists, we keep all the edges and nodes in two vecs and reference them by index which plays nice
/// with the cache
#[derive(Debug, Clone)]
pub(crate) struct ChangeGraph {
nodes: Vec<ChangeNode>,
edges: Vec<Edge>,
hashes: Vec<ChangeHash>,
nodes_by_hash: BTreeMap<ChangeHash, NodeIdx>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct NodeIdx(u32);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct EdgeIdx(u32);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct HashIdx(u32);
#[derive(Debug, Clone)]
struct Edge {
// Edges are always child -> parent so we only store the target, the child is implicit
// as you get the edge from the child
target: NodeIdx,
next: Option<EdgeIdx>,
}
#[derive(Debug, Clone)]
struct ChangeNode {
hash_idx: HashIdx,
actor_index: usize,
seq: u64,
max_op: u64,
parents: Option<EdgeIdx>,
}
impl ChangeGraph {
pub(crate) fn new() -> Self {
Self {
nodes: Vec::new(),
edges: Vec::new(),
nodes_by_hash: BTreeMap::new(),
hashes: Vec::new(),
}
}
pub(crate) fn add_change(
&mut self,
change: &Change,
actor_idx: usize,
) -> Result<(), MissingDep> {
let hash = change.hash();
if self.nodes_by_hash.contains_key(&hash) {
return Ok(());
}
let parent_indices = change
.deps()
.iter()
.map(|h| self.nodes_by_hash.get(h).copied().ok_or(MissingDep(*h)))
.collect::<Result<Vec<_>, _>>()?;
let node_idx = self.add_node(actor_idx, change);
self.nodes_by_hash.insert(hash, node_idx);
for parent_idx in parent_indices {
self.add_parent(node_idx, parent_idx);
}
Ok(())
}
fn add_node(&mut self, actor_index: usize, change: &Change) -> NodeIdx {
let idx = NodeIdx(self.nodes.len() as u32);
let hash_idx = self.add_hash(change.hash());
self.nodes.push(ChangeNode {
hash_idx,
actor_index,
seq: change.seq(),
max_op: change.max_op(),
parents: None,
});
idx
}
fn add_hash(&mut self, hash: ChangeHash) -> HashIdx {
let idx = HashIdx(self.hashes.len() as u32);
self.hashes.push(hash);
idx
}
fn add_parent(&mut self, child_idx: NodeIdx, parent_idx: NodeIdx) {
let new_edge_idx = EdgeIdx(self.edges.len() as u32);
let new_edge = Edge {
target: parent_idx,
next: None,
};
self.edges.push(new_edge);
let child = &mut self.nodes[child_idx.0 as usize];
if let Some(edge_idx) = child.parents {
let mut edge = &mut self.edges[edge_idx.0 as usize];
while let Some(next) = edge.next {
edge = &mut self.edges[next.0 as usize];
}
edge.next = Some(new_edge_idx);
} else {
child.parents = Some(new_edge_idx);
}
}
fn parents(&self, node_idx: NodeIdx) -> impl Iterator<Item = NodeIdx> + '_ {
let mut edge_idx = self.nodes[node_idx.0 as usize].parents;
std::iter::from_fn(move || {
let this_edge_idx = edge_idx?;
let edge = &self.edges[this_edge_idx.0 as usize];
edge_idx = edge.next;
Some(edge.target)
})
}
pub(crate) fn clock_for_heads(&self, heads: &[ChangeHash]) -> Clock {
let mut clock = Clock::new();
self.traverse_ancestors(heads, |node, _hash| {
clock.include(
node.actor_index,
ClockData {
max_op: node.max_op,
seq: node.seq,
},
);
});
clock
}
pub(crate) fn remove_ancestors(
&self,
changes: &mut BTreeSet<ChangeHash>,
heads: &[ChangeHash],
) {
self.traverse_ancestors(heads, |_node, hash| {
changes.remove(hash);
});
}
/// Call `f` for each (node, hash) in the graph, starting from the given heads
///
/// No guarantees are made about the order of traversal but each node will only be visited
/// once.
fn traverse_ancestors<F: FnMut(&ChangeNode, &ChangeHash)>(
&self,
heads: &[ChangeHash],
mut f: F,
) {
let mut to_visit = heads
.iter()
.filter_map(|h| self.nodes_by_hash.get(h))
.copied()
.collect::<Vec<_>>();
let mut visited = BTreeSet::new();
while let Some(idx) = to_visit.pop() {
if visited.contains(&idx) {
continue;
} else {
visited.insert(idx);
}
let node = &self.nodes[idx.0 as usize];
let hash = &self.hashes[node.hash_idx.0 as usize];
f(node, hash);
to_visit.extend(self.parents(idx));
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("attempted to derive a clock for a change with dependencies we don't have")]
pub struct MissingDep(ChangeHash);
#[cfg(test)]
mod tests {
use std::{
num::NonZeroU64,
time::{SystemTime, UNIX_EPOCH},
};
use crate::{
clock::ClockData,
op_tree::OpSetMetadata,
storage::{change::ChangeBuilder, convert::op_as_actor_id},
types::{Key, ObjId, Op, OpId, OpIds},
ActorId,
};
use super::*;
#[test]
fn clock_by_heads() {
let mut builder = TestGraphBuilder::new();
let actor1 = builder.actor();
let actor2 = builder.actor();
let actor3 = builder.actor();
let change1 = builder.change(&actor1, 10, &[]);
let change2 = builder.change(&actor2, 20, &[change1]);
let change3 = builder.change(&actor3, 30, &[change1]);
let change4 = builder.change(&actor1, 10, &[change2, change3]);
let graph = builder.build();
let mut expected_clock = Clock::new();
expected_clock.include(builder.index(&actor1), ClockData { max_op: 50, seq: 2 });
expected_clock.include(builder.index(&actor2), ClockData { max_op: 30, seq: 1 });
expected_clock.include(builder.index(&actor3), ClockData { max_op: 40, seq: 1 });
let clock = graph.clock_for_heads(&[change4]);
assert_eq!(clock, expected_clock);
}
#[test]
fn remove_ancestors() {
let mut builder = TestGraphBuilder::new();
let actor1 = builder.actor();
let actor2 = builder.actor();
let actor3 = builder.actor();
let change1 = builder.change(&actor1, 10, &[]);
let change2 = builder.change(&actor2, 20, &[change1]);
let change3 = builder.change(&actor3, 30, &[change1]);
let change4 = builder.change(&actor1, 10, &[change2, change3]);
let graph = builder.build();
let mut changes = vec![change1, change2, change3, change4]
.into_iter()
.collect::<BTreeSet<_>>();
let heads = vec![change2];
graph.remove_ancestors(&mut changes, &heads);
let expected_changes = vec![change3, change4].into_iter().collect::<BTreeSet<_>>();
assert_eq!(changes, expected_changes);
}
struct TestGraphBuilder {
actors: Vec<ActorId>,
changes: Vec<Change>,
seqs_by_actor: BTreeMap<ActorId, u64>,
}
impl TestGraphBuilder {
fn new() -> Self {
TestGraphBuilder {
actors: Vec::new(),
changes: Vec::new(),
seqs_by_actor: BTreeMap::new(),
}
}
fn actor(&mut self) -> ActorId {
let actor = ActorId::random();
self.actors.push(actor.clone());
actor
}
fn index(&self, actor: &ActorId) -> usize {
self.actors.iter().position(|a| a == actor).unwrap()
}
/// Create a change with `num_new_ops` and `parents` for `actor`
///
/// The `start_op` and `seq` of the change will be computed from the
/// previous changes for the same actor.
fn change(
&mut self,
actor: &ActorId,
num_new_ops: usize,
parents: &[ChangeHash],
) -> ChangeHash {
let mut meta = OpSetMetadata::from_actors(self.actors.clone());
let key = meta.props.cache("key".to_string());
let start_op = parents
.iter()
.map(|c| {
self.changes
.iter()
.find(|change| change.hash() == *c)
.unwrap()
.max_op()
})
.max()
.unwrap_or(0)
+ 1;
let actor_idx = self.index(actor);
let ops = (0..num_new_ops)
.map(|opnum| Op {
id: OpId::new(start_op + opnum as u64, actor_idx),
action: crate::OpType::Put("value".into()),
key: Key::Map(key),
succ: OpIds::empty(),
pred: OpIds::empty(),
insert: false,
})
.collect::<Vec<_>>();
let root = ObjId::root();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let seq = self.seqs_by_actor.entry(actor.clone()).or_insert(1);
let change = Change::new(
ChangeBuilder::new()
.with_dependencies(parents.to_vec())
.with_start_op(NonZeroU64::new(start_op).unwrap())
.with_actor(actor.clone())
.with_seq(*seq)
.with_timestamp(timestamp)
.build(ops.iter().map(|op| op_as_actor_id(&root, op, &meta)))
.unwrap(),
);
*seq = seq.checked_add(1).unwrap();
let hash = change.hash();
self.changes.push(change);
hash
}
fn build(&self) -> ChangeGraph {
let mut graph = ChangeGraph::new();
for change in &self.changes {
let actor_idx = self.index(change.actor_id());
graph.add_change(change, actor_idx).unwrap();
}
graph
}
}
}

View file

@ -71,12 +71,6 @@ impl Clock {
self.0.get(actor_index)
}
pub(crate) fn merge(&mut self, other: &Self) {
for (actor, data) in &other.0 {
self.include(*actor, *data);
}
}
fn is_greater(&self, other: &Self) -> bool {
let mut has_greater = false;

View file

@ -1,44 +0,0 @@
use crate::{
clock::{Clock, ClockData},
Change, ChangeHash,
};
use std::collections::HashMap;
pub(crate) struct Clocks(HashMap<ChangeHash, Clock>);
#[derive(Debug, thiserror::Error)]
#[error("attempted to derive a clock for a change with dependencies we don't have")]
pub struct MissingDep(ChangeHash);
impl Clocks {
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
pub(crate) fn add_change(
&mut self,
change: &Change,
actor_index: usize,
) -> Result<(), MissingDep> {
let mut clock = Clock::new();
for hash in change.deps() {
let c = self.0.get(hash).ok_or(MissingDep(*hash))?;
clock.merge(c);
}
clock.include(
actor_index,
ClockData {
max_op: change.max_op(),
seq: change.seq(),
},
);
self.0.insert(change.hash(), clock);
Ok(())
}
}
impl From<Clocks> for HashMap<ChangeHash, Clock> {
fn from(c: Clocks) -> Self {
c.0
}
}

View file

@ -7,7 +7,7 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum AutomergeError {
#[error(transparent)]
Clocks(#[from] crate::clocks::MissingDep),
ChangeGraph(#[from] crate::change_graph::MissingDep),
#[error("failed to load compressed data: {0}")]
Deflate(#[source] std::io::Error),
#[error("duplicate seq {0} found for actor {1}")]

View file

@ -244,8 +244,8 @@ mod autocommit;
mod automerge;
mod autoserde;
mod change;
mod change_graph;
mod clock;
mod clocks;
mod columnar;
mod convert;
mod error;