diff --git a/rust/automerge-wasm/src/lib.rs b/rust/automerge-wasm/src/lib.rs index d6ccc8c8..7e58e5e4 100644 --- a/rust/automerge-wasm/src/lib.rs +++ b/rust/automerge-wasm/src/lib.rs @@ -574,6 +574,12 @@ impl Automerge { let mut object = object .dyn_into::() .map_err(|_| error::ApplyPatch::NotObjectd)?; + + if self.doc.observer().has_overflowed() { + self.doc.observer().enable(true); + return Ok(self.export_object(&am::ROOT, Datatype::Map, None, &meta)?); + } + let patches = self.doc.observer().take_patches(); let callback = callback.dyn_into::().ok(); diff --git a/rust/automerge-wasm/src/observer.rs b/rust/automerge-wasm/src/observer.rs index 83516597..09ee4042 100644 --- a/rust/automerge-wasm/src/observer.rs +++ b/rust/automerge-wasm/src/observer.rs @@ -10,9 +10,30 @@ use automerge::{Automerge, ObjId, OpObserver, Prop, ScalarValue, SequenceTree, V use js_sys::{Array, Object}; use wasm_bindgen::prelude::*; +#[derive(Debug, Clone, PartialEq, Default, Copy)] +pub(crate) enum ObserverState { + #[default] + Disabled, + Enabled, + Overflow, +} + +impl ObserverState { + fn set(&mut self, enabled: bool) { + match enabled { + true => *self = Self::Enabled, + false => *self = Self::Disabled, + } + } + + fn is_enabled(&self) -> bool { + matches!(self, Self::Enabled) + } +} + #[derive(Debug, Clone, Default)] pub(crate) struct Observer { - enabled: bool, + state: ObserverState, patches: Vec, text_rep: TextRepresentation, } @@ -21,13 +42,25 @@ impl Observer { pub(crate) fn take_patches(&mut self) -> Vec { std::mem::take(&mut self.patches) } + pub(crate) fn enable(&mut self, enable: bool) -> bool { - if self.enabled && !enable { + if !enable { self.patches.truncate(0) } - let old_enabled = self.enabled; - self.enabled = enable; - old_enabled + let old_state = self.state.is_enabled(); + self.state.set(enable); + old_state + } + + pub(crate) fn has_overflowed(&mut self) -> bool { + self.state == ObserverState::Overflow + } + + fn check_overflow(&mut self) { + if self.patches.len() > 100 || self.state == ObserverState::Overflow { + self.patches.truncate(0); + self.state = ObserverState::Overflow; + } } fn get_path(&mut self, doc: &Automerge, obj: &ObjId) -> Option> { @@ -105,7 +138,7 @@ impl OpObserver for Observer { index: usize, tagged_value: (Value<'_>, ObjId), ) { - if self.enabled { + if self.state.is_enabled() { let value = (tagged_value.0.to_owned(), tagged_value.1); if let Some(Patch::Insert { obj: tail_obj, @@ -131,11 +164,12 @@ impl OpObserver for Observer { }; self.patches.push(patch); } + self.check_overflow(); } } fn splice_text(&mut self, doc: &Automerge, obj: ObjId, index: usize, value: &str) { - if self.enabled { + if self.state.is_enabled() { if self.text_rep == TextRepresentation::Array { for (i, c) in value.chars().enumerate() { self.insert( @@ -179,11 +213,12 @@ impl OpObserver for Observer { }; self.patches.push(patch); } + self.check_overflow(); } } fn delete_seq(&mut self, doc: &Automerge, obj: ObjId, index: usize, length: usize) { - if self.enabled { + if self.state.is_enabled() { match self.patches.last_mut() { Some(Patch::SpliceText { obj: tail_obj, @@ -241,11 +276,12 @@ impl OpObserver for Observer { }; self.patches.push(patch) } + self.check_overflow(); } } fn delete_map(&mut self, doc: &Automerge, obj: ObjId, key: &str) { - if self.enabled { + if self.state.is_enabled() { if let Some(path) = self.get_path(doc, &obj) { let patch = Patch::DeleteMap { path, @@ -254,6 +290,7 @@ impl OpObserver for Observer { }; self.patches.push(patch) } + self.check_overflow(); } } @@ -265,7 +302,7 @@ impl OpObserver for Observer { tagged_value: (Value<'_>, ObjId), _conflict: bool, ) { - if self.enabled { + if self.state.is_enabled() { let expose = false; if let Some(path) = self.get_path(doc, &obj) { let value = (tagged_value.0.to_owned(), tagged_value.1); @@ -287,6 +324,7 @@ impl OpObserver for Observer { }; self.patches.push(patch); } + self.check_overflow(); } } @@ -298,7 +336,7 @@ impl OpObserver for Observer { tagged_value: (Value<'_>, ObjId), _conflict: bool, ) { - if self.enabled { + if self.state.is_enabled() { let expose = true; if let Some(path) = self.get_path(doc, &obj) { let value = (tagged_value.0.to_owned(), tagged_value.1); @@ -320,11 +358,12 @@ impl OpObserver for Observer { }; self.patches.push(patch); } + self.check_overflow(); } } fn increment(&mut self, doc: &Automerge, obj: ObjId, prop: Prop, tagged_value: (i64, ObjId)) { - if self.enabled { + if self.state.is_enabled() { if let Some(path) = self.get_path(doc, &obj) { let value = tagged_value.0; self.patches.push(Patch::Increment { @@ -334,17 +373,20 @@ impl OpObserver for Observer { value, }) } + self.check_overflow(); } } fn merge(&mut self, other: &Self) { - self.patches.extend_from_slice(other.patches.as_slice()) + self.patches.extend_from_slice(other.patches.as_slice()); + self.state = other.state; + self.check_overflow(); } fn branch(&self) -> Self { Observer { patches: vec![], - enabled: self.enabled, + state: self.state, text_rep: self.text_rep, } } diff --git a/rust/automerge/Cargo.toml b/rust/automerge/Cargo.toml index 89b48020..1d23a936 100644 --- a/rust/automerge/Cargo.toml +++ b/rust/automerge/Cargo.toml @@ -31,6 +31,7 @@ dot = { version = "0.1.4", optional = true } js-sys = { version = "^0.3", optional = true } wasm-bindgen = { version = "^0.2", optional = true } rand = { version = "^0.8.4", optional = true } +im = "15.1.0" [dependencies.web-sys] version = "^0.3.55" diff --git a/rust/automerge/src/automerge.rs b/rust/automerge/src/automerge.rs index 584f761d..da85a7a2 100644 --- a/rust/automerge/src/automerge.rs +++ b/rust/automerge/src/automerge.rs @@ -45,7 +45,7 @@ pub struct Automerge { /// Mapping from change hash to index into the history list. pub(crate) history_index: HashMap, /// Mapping from change hash to vector clock at this state. - pub(crate) clocks: HashMap, + pub(crate) clocks: Clocks, /// Mapping from actor index to list of seqs seen for them. pub(crate) states: HashMap>, /// Current dependencies of this document (heads hashes). @@ -68,7 +68,7 @@ impl Automerge { queue: vec![], history: vec![], history_index: HashMap::new(), - clocks: HashMap::new(), + clocks: Clocks::new(), states: HashMap::new(), ops: Default::default(), deps: Default::default(), @@ -690,7 +690,7 @@ impl Automerge { None => storage::load::reconstruct_document(&d, mode, OpSet::builder()), } .map_err(|e| load::Error::InflateDocument(Box::new(e)))?; - let mut hashes_by_index = HashMap::new(); + let mut history_index = HashMap::new(); let mut actor_to_history: HashMap> = HashMap::new(); let mut clocks = Clocks::new(); for (index, change) in changes.iter().enumerate() { @@ -698,16 +698,15 @@ impl Automerge { // all the changes let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap(); actor_to_history.entry(actor_index).or_default().push(index); - hashes_by_index.insert(index, change.hash()); + history_index.insert(change.hash(), index); clocks.add_change(change, actor_index)?; } - let history_index = hashes_by_index.into_iter().map(|(k, v)| (v, k)).collect(); Self { queue: vec![], history: changes, history_index, states: actor_to_history, - clocks: clocks.into(), + clocks, ops: op_set, deps: heads.into_iter().collect(), saved: Default::default(), @@ -1066,25 +1065,7 @@ impl Automerge { } fn clock_at(&self, heads: &[ChangeHash]) -> Result { - if let Some(first_hash) = heads.first() { - let mut clock = self - .clocks - .get(first_hash) - .ok_or(AutomergeError::MissingHash(*first_hash))? - .clone(); - - for hash in &heads[1..] { - let c = self - .clocks - .get(hash) - .ok_or(AutomergeError::MissingHash(*hash))?; - clock.merge(c); - } - - Ok(clock) - } else { - Ok(Clock::new()) - } + Ok(self.clocks.at(heads)?) } /// Get a change by its hash. @@ -1151,14 +1132,9 @@ impl Automerge { .push(history_index); self.history_index.insert(change.hash(), history_index); - let mut clock = Clock::new(); - for hash in change.deps() { - let c = self - .clocks - .get(hash) - .expect("Change's deps should already be in the document"); - clock.merge(c); - } + let mut clock = self + .clock_at(change.deps()) + .expect("Change's deps should already be in the document"); clock.include( actor_index, ClockData { diff --git a/rust/automerge/src/clock.rs b/rust/automerge/src/clock.rs index 79125323..4a92e37c 100644 --- a/rust/automerge/src/clock.rs +++ b/rust/automerge/src/clock.rs @@ -1,6 +1,6 @@ use crate::types::OpId; use fxhash::FxBuildHasher; -use std::{cmp::Ordering, collections::HashMap}; +use std::cmp::Ordering; #[derive(Default, Debug, Clone, Copy, PartialEq)] pub(crate) struct ClockData { @@ -19,7 +19,7 @@ impl PartialOrd for ClockData { /// Vector clock mapping actor indices to the max op counter of the changes created by that actor. #[derive(Default, Debug, Clone, PartialEq)] -pub(crate) struct Clock(HashMap); +pub(crate) struct Clock(im::hashmap::HashMap); // A general clock is greater if it has one element the other does not or has a counter higher than // the other for a given actor. diff --git a/rust/automerge/src/clocks.rs b/rust/automerge/src/clocks.rs index 60fc5c71..68615a34 100644 --- a/rust/automerge/src/clocks.rs +++ b/rust/automerge/src/clocks.rs @@ -4,6 +4,7 @@ use crate::{ }; use std::collections::HashMap; +#[derive(Debug, Clone)] pub(crate) struct Clocks(HashMap); #[derive(Debug, thiserror::Error)] @@ -20,11 +21,7 @@ impl Clocks { change: &Change, actor_index: usize, ) -> Result<(), MissingDep> { - let mut clock = Clock::new(); - for hash in change.deps() { - let c = self.0.get(hash).ok_or(MissingDep(*hash))?; - clock.merge(c); - } + let mut clock = self.at(change.deps())?; clock.include( actor_index, ClockData { @@ -35,6 +32,33 @@ impl Clocks { self.0.insert(change.hash(), clock); Ok(()) } + + pub(crate) fn get(&self, hash: &ChangeHash) -> Option<&Clock> { + self.0.get(hash) + } + + pub(crate) fn insert(&mut self, hash: ChangeHash, clock: Clock) -> Option { + self.0.insert(hash, clock) + } + + pub(crate) fn at(&self, heads: &[ChangeHash]) -> Result { + if let Some(first_hash) = heads.first() { + let mut clock = self + .0 + .get(first_hash) + .ok_or(MissingDep(*first_hash))? + .clone(); + + for hash in &heads[1..] { + let c = self.0.get(hash).ok_or(MissingDep(*hash))?; + clock.merge(c); + } + + Ok(clock) + } else { + Ok(Clock::new()) + } + } } impl From for HashMap {