Compare commits

...

1 commit

Author SHA1 Message Date
Orion Henry
0142176ebc bail after 100 patches, im clocks 2023-01-24 12:04:50 -06:00
6 changed files with 103 additions and 54 deletions

View file

@ -574,6 +574,12 @@ impl Automerge {
let mut object = object let mut object = object
.dyn_into::<Object>() .dyn_into::<Object>()
.map_err(|_| error::ApplyPatch::NotObjectd)?; .map_err(|_| error::ApplyPatch::NotObjectd)?;
if self.doc.observer().has_overflowed() {
self.doc.observer().enable(true);
return Ok(self.export_object(&am::ROOT, Datatype::Map, None, &meta)?);
}
let patches = self.doc.observer().take_patches(); let patches = self.doc.observer().take_patches();
let callback = callback.dyn_into::<Function>().ok(); let callback = callback.dyn_into::<Function>().ok();

View file

@ -10,9 +10,30 @@ use automerge::{Automerge, ObjId, OpObserver, Prop, ScalarValue, SequenceTree, V
use js_sys::{Array, Object}; use js_sys::{Array, Object};
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
#[derive(Debug, Clone, PartialEq, Default, Copy)]
pub(crate) enum ObserverState {
#[default]
Disabled,
Enabled,
Overflow,
}
impl ObserverState {
fn set(&mut self, enabled: bool) {
match enabled {
true => *self = Self::Enabled,
false => *self = Self::Disabled,
}
}
fn is_enabled(&self) -> bool {
matches!(self, Self::Enabled)
}
}
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub(crate) struct Observer { pub(crate) struct Observer {
enabled: bool, state: ObserverState,
patches: Vec<Patch>, patches: Vec<Patch>,
text_rep: TextRepresentation, text_rep: TextRepresentation,
} }
@ -21,13 +42,25 @@ impl Observer {
pub(crate) fn take_patches(&mut self) -> Vec<Patch> { pub(crate) fn take_patches(&mut self) -> Vec<Patch> {
std::mem::take(&mut self.patches) std::mem::take(&mut self.patches)
} }
pub(crate) fn enable(&mut self, enable: bool) -> bool { pub(crate) fn enable(&mut self, enable: bool) -> bool {
if self.enabled && !enable { if !enable {
self.patches.truncate(0) self.patches.truncate(0)
} }
let old_enabled = self.enabled; let old_state = self.state.is_enabled();
self.enabled = enable; self.state.set(enable);
old_enabled old_state
}
pub(crate) fn has_overflowed(&mut self) -> bool {
self.state == ObserverState::Overflow
}
fn check_overflow(&mut self) {
if self.patches.len() > 100 || self.state == ObserverState::Overflow {
self.patches.truncate(0);
self.state = ObserverState::Overflow;
}
} }
fn get_path(&mut self, doc: &Automerge, obj: &ObjId) -> Option<Vec<(ObjId, Prop)>> { fn get_path(&mut self, doc: &Automerge, obj: &ObjId) -> Option<Vec<(ObjId, Prop)>> {
@ -105,7 +138,7 @@ impl OpObserver for Observer {
index: usize, index: usize,
tagged_value: (Value<'_>, ObjId), tagged_value: (Value<'_>, ObjId),
) { ) {
if self.enabled { if self.state.is_enabled() {
let value = (tagged_value.0.to_owned(), tagged_value.1); let value = (tagged_value.0.to_owned(), tagged_value.1);
if let Some(Patch::Insert { if let Some(Patch::Insert {
obj: tail_obj, obj: tail_obj,
@ -131,11 +164,12 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch); self.patches.push(patch);
} }
self.check_overflow();
} }
} }
fn splice_text(&mut self, doc: &Automerge, obj: ObjId, index: usize, value: &str) { fn splice_text(&mut self, doc: &Automerge, obj: ObjId, index: usize, value: &str) {
if self.enabled { if self.state.is_enabled() {
if self.text_rep == TextRepresentation::Array { if self.text_rep == TextRepresentation::Array {
for (i, c) in value.chars().enumerate() { for (i, c) in value.chars().enumerate() {
self.insert( self.insert(
@ -179,11 +213,12 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch); self.patches.push(patch);
} }
self.check_overflow();
} }
} }
fn delete_seq(&mut self, doc: &Automerge, obj: ObjId, index: usize, length: usize) { fn delete_seq(&mut self, doc: &Automerge, obj: ObjId, index: usize, length: usize) {
if self.enabled { if self.state.is_enabled() {
match self.patches.last_mut() { match self.patches.last_mut() {
Some(Patch::SpliceText { Some(Patch::SpliceText {
obj: tail_obj, obj: tail_obj,
@ -241,11 +276,12 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch) self.patches.push(patch)
} }
self.check_overflow();
} }
} }
fn delete_map(&mut self, doc: &Automerge, obj: ObjId, key: &str) { fn delete_map(&mut self, doc: &Automerge, obj: ObjId, key: &str) {
if self.enabled { if self.state.is_enabled() {
if let Some(path) = self.get_path(doc, &obj) { if let Some(path) = self.get_path(doc, &obj) {
let patch = Patch::DeleteMap { let patch = Patch::DeleteMap {
path, path,
@ -254,6 +290,7 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch) self.patches.push(patch)
} }
self.check_overflow();
} }
} }
@ -265,7 +302,7 @@ impl OpObserver for Observer {
tagged_value: (Value<'_>, ObjId), tagged_value: (Value<'_>, ObjId),
_conflict: bool, _conflict: bool,
) { ) {
if self.enabled { if self.state.is_enabled() {
let expose = false; let expose = false;
if let Some(path) = self.get_path(doc, &obj) { if let Some(path) = self.get_path(doc, &obj) {
let value = (tagged_value.0.to_owned(), tagged_value.1); let value = (tagged_value.0.to_owned(), tagged_value.1);
@ -287,6 +324,7 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch); self.patches.push(patch);
} }
self.check_overflow();
} }
} }
@ -298,7 +336,7 @@ impl OpObserver for Observer {
tagged_value: (Value<'_>, ObjId), tagged_value: (Value<'_>, ObjId),
_conflict: bool, _conflict: bool,
) { ) {
if self.enabled { if self.state.is_enabled() {
let expose = true; let expose = true;
if let Some(path) = self.get_path(doc, &obj) { if let Some(path) = self.get_path(doc, &obj) {
let value = (tagged_value.0.to_owned(), tagged_value.1); let value = (tagged_value.0.to_owned(), tagged_value.1);
@ -320,11 +358,12 @@ impl OpObserver for Observer {
}; };
self.patches.push(patch); self.patches.push(patch);
} }
self.check_overflow();
} }
} }
fn increment(&mut self, doc: &Automerge, obj: ObjId, prop: Prop, tagged_value: (i64, ObjId)) { fn increment(&mut self, doc: &Automerge, obj: ObjId, prop: Prop, tagged_value: (i64, ObjId)) {
if self.enabled { if self.state.is_enabled() {
if let Some(path) = self.get_path(doc, &obj) { if let Some(path) = self.get_path(doc, &obj) {
let value = tagged_value.0; let value = tagged_value.0;
self.patches.push(Patch::Increment { self.patches.push(Patch::Increment {
@ -334,17 +373,20 @@ impl OpObserver for Observer {
value, value,
}) })
} }
self.check_overflow();
} }
} }
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
self.patches.extend_from_slice(other.patches.as_slice()) self.patches.extend_from_slice(other.patches.as_slice());
self.state = other.state;
self.check_overflow();
} }
fn branch(&self) -> Self { fn branch(&self) -> Self {
Observer { Observer {
patches: vec![], patches: vec![],
enabled: self.enabled, state: self.state,
text_rep: self.text_rep, text_rep: self.text_rep,
} }
} }

View file

@ -31,6 +31,7 @@ dot = { version = "0.1.4", optional = true }
js-sys = { version = "^0.3", optional = true } js-sys = { version = "^0.3", optional = true }
wasm-bindgen = { version = "^0.2", optional = true } wasm-bindgen = { version = "^0.2", optional = true }
rand = { version = "^0.8.4", optional = true } rand = { version = "^0.8.4", optional = true }
im = "15.1.0"
[dependencies.web-sys] [dependencies.web-sys]
version = "^0.3.55" version = "^0.3.55"

View file

@ -45,7 +45,7 @@ pub struct Automerge {
/// Mapping from change hash to index into the history list. /// Mapping from change hash to index into the history list.
pub(crate) history_index: HashMap<ChangeHash, usize>, pub(crate) history_index: HashMap<ChangeHash, usize>,
/// Mapping from change hash to vector clock at this state. /// Mapping from change hash to vector clock at this state.
pub(crate) clocks: HashMap<ChangeHash, Clock>, pub(crate) clocks: Clocks,
/// Mapping from actor index to list of seqs seen for them. /// Mapping from actor index to list of seqs seen for them.
pub(crate) states: HashMap<usize, Vec<usize>>, pub(crate) states: HashMap<usize, Vec<usize>>,
/// Current dependencies of this document (heads hashes). /// Current dependencies of this document (heads hashes).
@ -68,7 +68,7 @@ impl Automerge {
queue: vec![], queue: vec![],
history: vec![], history: vec![],
history_index: HashMap::new(), history_index: HashMap::new(),
clocks: HashMap::new(), clocks: Clocks::new(),
states: HashMap::new(), states: HashMap::new(),
ops: Default::default(), ops: Default::default(),
deps: Default::default(), deps: Default::default(),
@ -690,7 +690,7 @@ impl Automerge {
None => storage::load::reconstruct_document(&d, mode, OpSet::builder()), None => storage::load::reconstruct_document(&d, mode, OpSet::builder()),
} }
.map_err(|e| load::Error::InflateDocument(Box::new(e)))?; .map_err(|e| load::Error::InflateDocument(Box::new(e)))?;
let mut hashes_by_index = HashMap::new(); let mut history_index = HashMap::new();
let mut actor_to_history: HashMap<usize, Vec<usize>> = HashMap::new(); let mut actor_to_history: HashMap<usize, Vec<usize>> = HashMap::new();
let mut clocks = Clocks::new(); let mut clocks = Clocks::new();
for (index, change) in changes.iter().enumerate() { for (index, change) in changes.iter().enumerate() {
@ -698,16 +698,15 @@ impl Automerge {
// all the changes // all the changes
let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap(); let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap();
actor_to_history.entry(actor_index).or_default().push(index); actor_to_history.entry(actor_index).or_default().push(index);
hashes_by_index.insert(index, change.hash()); history_index.insert(change.hash(), index);
clocks.add_change(change, actor_index)?; clocks.add_change(change, actor_index)?;
} }
let history_index = hashes_by_index.into_iter().map(|(k, v)| (v, k)).collect();
Self { Self {
queue: vec![], queue: vec![],
history: changes, history: changes,
history_index, history_index,
states: actor_to_history, states: actor_to_history,
clocks: clocks.into(), clocks,
ops: op_set, ops: op_set,
deps: heads.into_iter().collect(), deps: heads.into_iter().collect(),
saved: Default::default(), saved: Default::default(),
@ -1066,25 +1065,7 @@ impl Automerge {
} }
fn clock_at(&self, heads: &[ChangeHash]) -> Result<Clock, AutomergeError> { fn clock_at(&self, heads: &[ChangeHash]) -> Result<Clock, AutomergeError> {
if let Some(first_hash) = heads.first() { Ok(self.clocks.at(heads)?)
let mut clock = self
.clocks
.get(first_hash)
.ok_or(AutomergeError::MissingHash(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self
.clocks
.get(hash)
.ok_or(AutomergeError::MissingHash(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
} }
/// Get a change by its hash. /// Get a change by its hash.
@ -1151,14 +1132,9 @@ impl Automerge {
.push(history_index); .push(history_index);
self.history_index.insert(change.hash(), history_index); self.history_index.insert(change.hash(), history_index);
let mut clock = Clock::new(); let mut clock = self
for hash in change.deps() { .clock_at(change.deps())
let c = self .expect("Change's deps should already be in the document");
.clocks
.get(hash)
.expect("Change's deps should already be in the document");
clock.merge(c);
}
clock.include( clock.include(
actor_index, actor_index,
ClockData { ClockData {

View file

@ -1,6 +1,6 @@
use crate::types::OpId; use crate::types::OpId;
use fxhash::FxBuildHasher; use fxhash::FxBuildHasher;
use std::{cmp::Ordering, collections::HashMap}; use std::cmp::Ordering;
#[derive(Default, Debug, Clone, Copy, PartialEq)] #[derive(Default, Debug, Clone, Copy, PartialEq)]
pub(crate) struct ClockData { pub(crate) struct ClockData {
@ -19,7 +19,7 @@ impl PartialOrd for ClockData {
/// Vector clock mapping actor indices to the max op counter of the changes created by that actor. /// Vector clock mapping actor indices to the max op counter of the changes created by that actor.
#[derive(Default, Debug, Clone, PartialEq)] #[derive(Default, Debug, Clone, PartialEq)]
pub(crate) struct Clock(HashMap<usize, ClockData, FxBuildHasher>); pub(crate) struct Clock(im::hashmap::HashMap<usize, ClockData, FxBuildHasher>);
// A general clock is greater if it has one element the other does not or has a counter higher than // A general clock is greater if it has one element the other does not or has a counter higher than
// the other for a given actor. // the other for a given actor.

View file

@ -4,6 +4,7 @@ use crate::{
}; };
use std::collections::HashMap; use std::collections::HashMap;
#[derive(Debug, Clone)]
pub(crate) struct Clocks(HashMap<ChangeHash, Clock>); pub(crate) struct Clocks(HashMap<ChangeHash, Clock>);
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -20,11 +21,7 @@ impl Clocks {
change: &Change, change: &Change,
actor_index: usize, actor_index: usize,
) -> Result<(), MissingDep> { ) -> Result<(), MissingDep> {
let mut clock = Clock::new(); let mut clock = self.at(change.deps())?;
for hash in change.deps() {
let c = self.0.get(hash).ok_or(MissingDep(*hash))?;
clock.merge(c);
}
clock.include( clock.include(
actor_index, actor_index,
ClockData { ClockData {
@ -35,6 +32,33 @@ impl Clocks {
self.0.insert(change.hash(), clock); self.0.insert(change.hash(), clock);
Ok(()) Ok(())
} }
pub(crate) fn get(&self, hash: &ChangeHash) -> Option<&Clock> {
self.0.get(hash)
}
pub(crate) fn insert(&mut self, hash: ChangeHash, clock: Clock) -> Option<Clock> {
self.0.insert(hash, clock)
}
pub(crate) fn at(&self, heads: &[ChangeHash]) -> Result<Clock, MissingDep> {
if let Some(first_hash) = heads.first() {
let mut clock = self
.0
.get(first_hash)
.ok_or(MissingDep(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self.0.get(hash).ok_or(MissingDep(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
}
} }
impl From<Clocks> for HashMap<ChangeHash, Clock> { impl From<Clocks> for HashMap<ChangeHash, Clock> {