Compare commits

...

1 commit

Author SHA1 Message Date
Orion Henry
0142176ebc bail after 100 patches, im clocks 2023-01-24 12:04:50 -06:00
6 changed files with 103 additions and 54 deletions

View file

@ -574,6 +574,12 @@ impl Automerge {
let mut object = object
.dyn_into::<Object>()
.map_err(|_| error::ApplyPatch::NotObjectd)?;
if self.doc.observer().has_overflowed() {
self.doc.observer().enable(true);
return Ok(self.export_object(&am::ROOT, Datatype::Map, None, &meta)?);
}
let patches = self.doc.observer().take_patches();
let callback = callback.dyn_into::<Function>().ok();

View file

@ -10,9 +10,30 @@ use automerge::{Automerge, ObjId, OpObserver, Prop, ScalarValue, SequenceTree, V
use js_sys::{Array, Object};
use wasm_bindgen::prelude::*;
#[derive(Debug, Clone, PartialEq, Default, Copy)]
pub(crate) enum ObserverState {
#[default]
Disabled,
Enabled,
Overflow,
}
impl ObserverState {
fn set(&mut self, enabled: bool) {
match enabled {
true => *self = Self::Enabled,
false => *self = Self::Disabled,
}
}
fn is_enabled(&self) -> bool {
matches!(self, Self::Enabled)
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct Observer {
enabled: bool,
state: ObserverState,
patches: Vec<Patch>,
text_rep: TextRepresentation,
}
@ -21,13 +42,25 @@ impl Observer {
pub(crate) fn take_patches(&mut self) -> Vec<Patch> {
std::mem::take(&mut self.patches)
}
pub(crate) fn enable(&mut self, enable: bool) -> bool {
if self.enabled && !enable {
if !enable {
self.patches.truncate(0)
}
let old_enabled = self.enabled;
self.enabled = enable;
old_enabled
let old_state = self.state.is_enabled();
self.state.set(enable);
old_state
}
pub(crate) fn has_overflowed(&mut self) -> bool {
self.state == ObserverState::Overflow
}
fn check_overflow(&mut self) {
if self.patches.len() > 100 || self.state == ObserverState::Overflow {
self.patches.truncate(0);
self.state = ObserverState::Overflow;
}
}
fn get_path(&mut self, doc: &Automerge, obj: &ObjId) -> Option<Vec<(ObjId, Prop)>> {
@ -105,7 +138,7 @@ impl OpObserver for Observer {
index: usize,
tagged_value: (Value<'_>, ObjId),
) {
if self.enabled {
if self.state.is_enabled() {
let value = (tagged_value.0.to_owned(), tagged_value.1);
if let Some(Patch::Insert {
obj: tail_obj,
@ -131,11 +164,12 @@ impl OpObserver for Observer {
};
self.patches.push(patch);
}
self.check_overflow();
}
}
fn splice_text(&mut self, doc: &Automerge, obj: ObjId, index: usize, value: &str) {
if self.enabled {
if self.state.is_enabled() {
if self.text_rep == TextRepresentation::Array {
for (i, c) in value.chars().enumerate() {
self.insert(
@ -179,11 +213,12 @@ impl OpObserver for Observer {
};
self.patches.push(patch);
}
self.check_overflow();
}
}
fn delete_seq(&mut self, doc: &Automerge, obj: ObjId, index: usize, length: usize) {
if self.enabled {
if self.state.is_enabled() {
match self.patches.last_mut() {
Some(Patch::SpliceText {
obj: tail_obj,
@ -241,11 +276,12 @@ impl OpObserver for Observer {
};
self.patches.push(patch)
}
self.check_overflow();
}
}
fn delete_map(&mut self, doc: &Automerge, obj: ObjId, key: &str) {
if self.enabled {
if self.state.is_enabled() {
if let Some(path) = self.get_path(doc, &obj) {
let patch = Patch::DeleteMap {
path,
@ -254,6 +290,7 @@ impl OpObserver for Observer {
};
self.patches.push(patch)
}
self.check_overflow();
}
}
@ -265,7 +302,7 @@ impl OpObserver for Observer {
tagged_value: (Value<'_>, ObjId),
_conflict: bool,
) {
if self.enabled {
if self.state.is_enabled() {
let expose = false;
if let Some(path) = self.get_path(doc, &obj) {
let value = (tagged_value.0.to_owned(), tagged_value.1);
@ -287,6 +324,7 @@ impl OpObserver for Observer {
};
self.patches.push(patch);
}
self.check_overflow();
}
}
@ -298,7 +336,7 @@ impl OpObserver for Observer {
tagged_value: (Value<'_>, ObjId),
_conflict: bool,
) {
if self.enabled {
if self.state.is_enabled() {
let expose = true;
if let Some(path) = self.get_path(doc, &obj) {
let value = (tagged_value.0.to_owned(), tagged_value.1);
@ -320,11 +358,12 @@ impl OpObserver for Observer {
};
self.patches.push(patch);
}
self.check_overflow();
}
}
fn increment(&mut self, doc: &Automerge, obj: ObjId, prop: Prop, tagged_value: (i64, ObjId)) {
if self.enabled {
if self.state.is_enabled() {
if let Some(path) = self.get_path(doc, &obj) {
let value = tagged_value.0;
self.patches.push(Patch::Increment {
@ -334,17 +373,20 @@ impl OpObserver for Observer {
value,
})
}
self.check_overflow();
}
}
fn merge(&mut self, other: &Self) {
self.patches.extend_from_slice(other.patches.as_slice())
self.patches.extend_from_slice(other.patches.as_slice());
self.state = other.state;
self.check_overflow();
}
fn branch(&self) -> Self {
Observer {
patches: vec![],
enabled: self.enabled,
state: self.state,
text_rep: self.text_rep,
}
}

View file

@ -31,6 +31,7 @@ dot = { version = "0.1.4", optional = true }
js-sys = { version = "^0.3", optional = true }
wasm-bindgen = { version = "^0.2", optional = true }
rand = { version = "^0.8.4", optional = true }
im = "15.1.0"
[dependencies.web-sys]
version = "^0.3.55"

View file

@ -45,7 +45,7 @@ pub struct Automerge {
/// Mapping from change hash to index into the history list.
pub(crate) history_index: HashMap<ChangeHash, usize>,
/// Mapping from change hash to vector clock at this state.
pub(crate) clocks: HashMap<ChangeHash, Clock>,
pub(crate) clocks: Clocks,
/// Mapping from actor index to list of seqs seen for them.
pub(crate) states: HashMap<usize, Vec<usize>>,
/// Current dependencies of this document (heads hashes).
@ -68,7 +68,7 @@ impl Automerge {
queue: vec![],
history: vec![],
history_index: HashMap::new(),
clocks: HashMap::new(),
clocks: Clocks::new(),
states: HashMap::new(),
ops: Default::default(),
deps: Default::default(),
@ -690,7 +690,7 @@ impl Automerge {
None => storage::load::reconstruct_document(&d, mode, OpSet::builder()),
}
.map_err(|e| load::Error::InflateDocument(Box::new(e)))?;
let mut hashes_by_index = HashMap::new();
let mut history_index = HashMap::new();
let mut actor_to_history: HashMap<usize, Vec<usize>> = HashMap::new();
let mut clocks = Clocks::new();
for (index, change) in changes.iter().enumerate() {
@ -698,16 +698,15 @@ impl Automerge {
// all the changes
let actor_index = op_set.m.actors.lookup(change.actor_id()).unwrap();
actor_to_history.entry(actor_index).or_default().push(index);
hashes_by_index.insert(index, change.hash());
history_index.insert(change.hash(), index);
clocks.add_change(change, actor_index)?;
}
let history_index = hashes_by_index.into_iter().map(|(k, v)| (v, k)).collect();
Self {
queue: vec![],
history: changes,
history_index,
states: actor_to_history,
clocks: clocks.into(),
clocks,
ops: op_set,
deps: heads.into_iter().collect(),
saved: Default::default(),
@ -1066,25 +1065,7 @@ impl Automerge {
}
fn clock_at(&self, heads: &[ChangeHash]) -> Result<Clock, AutomergeError> {
if let Some(first_hash) = heads.first() {
let mut clock = self
.clocks
.get(first_hash)
.ok_or(AutomergeError::MissingHash(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self
.clocks
.get(hash)
.ok_or(AutomergeError::MissingHash(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
Ok(self.clocks.at(heads)?)
}
/// Get a change by its hash.
@ -1151,14 +1132,9 @@ impl Automerge {
.push(history_index);
self.history_index.insert(change.hash(), history_index);
let mut clock = Clock::new();
for hash in change.deps() {
let c = self
.clocks
.get(hash)
.expect("Change's deps should already be in the document");
clock.merge(c);
}
let mut clock = self
.clock_at(change.deps())
.expect("Change's deps should already be in the document");
clock.include(
actor_index,
ClockData {

View file

@ -1,6 +1,6 @@
use crate::types::OpId;
use fxhash::FxBuildHasher;
use std::{cmp::Ordering, collections::HashMap};
use std::cmp::Ordering;
#[derive(Default, Debug, Clone, Copy, PartialEq)]
pub(crate) struct ClockData {
@ -19,7 +19,7 @@ impl PartialOrd for ClockData {
/// Vector clock mapping actor indices to the max op counter of the changes created by that actor.
#[derive(Default, Debug, Clone, PartialEq)]
pub(crate) struct Clock(HashMap<usize, ClockData, FxBuildHasher>);
pub(crate) struct Clock(im::hashmap::HashMap<usize, ClockData, FxBuildHasher>);
// A general clock is greater if it has one element the other does not or has a counter higher than
// the other for a given actor.

View file

@ -4,6 +4,7 @@ use crate::{
};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub(crate) struct Clocks(HashMap<ChangeHash, Clock>);
#[derive(Debug, thiserror::Error)]
@ -20,11 +21,7 @@ impl Clocks {
change: &Change,
actor_index: usize,
) -> Result<(), MissingDep> {
let mut clock = Clock::new();
for hash in change.deps() {
let c = self.0.get(hash).ok_or(MissingDep(*hash))?;
clock.merge(c);
}
let mut clock = self.at(change.deps())?;
clock.include(
actor_index,
ClockData {
@ -35,6 +32,33 @@ impl Clocks {
self.0.insert(change.hash(), clock);
Ok(())
}
pub(crate) fn get(&self, hash: &ChangeHash) -> Option<&Clock> {
self.0.get(hash)
}
pub(crate) fn insert(&mut self, hash: ChangeHash, clock: Clock) -> Option<Clock> {
self.0.insert(hash, clock)
}
pub(crate) fn at(&self, heads: &[ChangeHash]) -> Result<Clock, MissingDep> {
if let Some(first_hash) = heads.first() {
let mut clock = self
.0
.get(first_hash)
.ok_or(MissingDep(*first_hash))?
.clone();
for hash in &heads[1..] {
let c = self.0.get(hash).ok_or(MissingDep(*hash))?;
clock.merge(c);
}
Ok(clock)
} else {
Ok(Clock::new())
}
}
}
impl From<Clocks> for HashMap<ChangeHash, Clock> {