Merge pull request from alexjg/histories

fix transitive deps bug, refactor actor_histories, remove root_value
This commit is contained in:
Orion Henry 2020-03-11 12:30:56 -04:00 committed by GitHub
commit 737576ef6c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 212 additions and 1039 deletions

View file

@ -64,8 +64,10 @@ impl State {
#[wasm_bindgen(js_name = getChanges)]
pub fn get_changes(&self, state: &State) -> Result<JsValue, JsValue> {
log!("get_changes");
let changes = self.backend.get_changes(&state.backend)
.map_err(automerge_error_to_js)?;
let changes = self
.backend
.get_changes(&state.backend)
.map_err(automerge_error_to_js)?;
rust_to_js(&changes)
}
@ -73,7 +75,7 @@ impl State {
pub fn get_changes_for_actorid(&self, actorid: JsValue) -> Result<JsValue, JsValue> {
log!("get_changes_for_actorid");
let a: ActorID = js_to_rust(actorid)?;
let changes = self.backend.get_changes_for_actor_id(a);
let changes = self.backend.get_changes_for_actor_id(&a);
rust_to_js(&changes)
}
@ -138,13 +140,19 @@ impl State {
#[wasm_bindgen]
#[wasm_bindgen(js_name = forkAt)]
pub fn fork_at(&self, _clock: JsValue) -> Result<State,JsValue> {
pub fn fork_at(&self, _clock: JsValue) -> Result<State, JsValue> {
log!("fork_at");
let clock: Clock = js_to_rust(_clock)?;
let changes = self.backend.history().iter()
.filter(|change| clock.get(&change.actor_id) >= change.seq)
.cloned().collect();
let mut fork = State { backend: Backend::init() };
let changes = self
.backend
.history()
.iter()
.filter(|change| clock.get(&change.actor_id) >= change.seq)
.cloned()
.collect();
let mut fork = State {
backend: Backend::init(),
};
let _patch = fork
.backend
.apply_changes(changes)

View file

@ -1,69 +0,0 @@
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{ActorID, Change, Clock};
use std::collections::HashMap;
/// ActorHistories is a cache for the transitive dependencies of each change
/// received from each actor. This is necessary because a change only ships its
/// direct dependencies in `deps` but we need all dependencies to determine
/// whether two operations occurrred concurrently.
#[derive(Debug, Clone, PartialEq)]
pub struct ActorHistories(HashMap<ActorID, HashMap<u32, Clock>>);
impl ActorHistories {
pub(crate) fn new() -> ActorHistories {
ActorHistories(HashMap::new())
}
/// Return the latest sequence required by `op` for actor `actor`
fn dependency_for(&self, op: &OperationWithMetadata, actor: &ActorID) -> u32 {
self.0
.get(&op.actor_id)
.and_then(|clocks| clocks.get(&op.sequence))
.map(|c| c.get(actor))
.unwrap_or(0)
}
/// Update this ActorHistories to include the changes in `change`
pub(crate) fn add_change(&mut self, change: &Change) {
let change_deps = change
.dependencies
.with(&change.actor_id, change.seq - 1);
let transitive = self.transitive_dependencies(&change.actor_id, change.seq);
let all_deps = transitive.union(&change_deps);
let state = self
.0
.entry(change.actor_id.clone())
.or_insert_with(HashMap::new);
state.insert(change.seq, all_deps);
}
pub fn transitive_dependencies(&self, actor_id: &ActorID, seq: u32) -> Clock {
self.0
.get(actor_id)
.and_then(|deps| deps.get(&seq))
.cloned()
.unwrap_or_else(Clock::empty)
}
pub fn transitive_dependencies_of_clock(&self, clock: &Clock) -> Clock {
clock
.into_iter()
.fold(Clock::empty(), |clock, (actor_id, seq)| {
clock.union(&self.transitive_dependencies(actor_id, *seq))
})
.union(clock)
}
/// Whether the two operations in question are concurrent
pub(crate) fn are_concurrent(
&self,
op1: &OperationWithMetadata,
op2: &OperationWithMetadata,
) -> bool {
if op1.sequence == op2.sequence && op1.actor_id == op2.actor_id {
return false;
}
self.dependency_for(op1, &op2.actor_id) < op2.sequence
&& self.dependency_for(op2, &op1.actor_id) < op1.sequence
}
}

View file

@ -0,0 +1,117 @@
use crate::error::AutomergeError;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{ActorID, Change, Clock};
use std::collections::HashMap;
// ActorStates manages
// `change_by_actor` - a seq ordered vec of changes per actor
// `deps_by_actor` - a seq ordered vec of transitive deps per actor
// `history` - a list of all changes received in order
// this struct is used for telling if two ops are concurrent or referencing
// historic changes
#[derive(Debug, PartialEq, Clone)]
pub struct ActorStates {
pub history: Vec<Change>,
change_by_actor: HashMap<ActorID, Vec<Change>>,
deps_by_actor: HashMap<ActorID, Vec<Clock>>,
// this lets me return a reference to an empty clock when needed
// without having to do any extra allocations or copies
// in the default path
empty_clock: Clock,
}
impl ActorStates {
pub(crate) fn new() -> ActorStates {
ActorStates {
change_by_actor: HashMap::new(),
deps_by_actor: HashMap::new(),
empty_clock: Clock::empty(),
history: Vec::new(),
}
}
pub fn is_concurrent(&self, op1: &OperationWithMetadata, op2: &OperationWithMetadata) -> bool {
let clock1 = self.get_deps(&op1.actor_id, op1.sequence);
let clock2 = self.get_deps(&op2.actor_id, op2.sequence);
clock1.get(&op2.actor_id) < op2.sequence && clock2.get(&op1.actor_id) < op1.sequence
}
pub fn get(&self, actor_id: &ActorID) -> Vec<&Change> {
self.change_by_actor
.get(actor_id)
.map(|vec| vec.iter().collect())
.unwrap_or_default()
}
fn get_change(&self, actor_id: &ActorID, seq: u32) -> Option<&Change> {
self.change_by_actor
.get(actor_id)
.and_then(|v| v.get((seq as usize) - 1))
}
fn get_deps(&self, actor_id: &ActorID, seq: u32) -> &Clock {
self.get_deps_option(actor_id, seq)
.unwrap_or(&self.empty_clock)
}
fn get_deps_option(&self, actor_id: &ActorID, seq: u32) -> Option<&Clock> {
self.deps_by_actor
.get(actor_id)
.and_then(|v| v.get((seq as usize) - 1))
}
fn transitive_deps(&self, clock: &Clock) -> Clock {
let mut all_deps = clock.clone();
clock
.into_iter()
.filter_map(|(actor_id, seq)| self.get_deps_option(actor_id, *seq))
.for_each(|deps| all_deps.merge(deps));
all_deps
}
// if the change is new - return Ok(true)
// if the change is a duplicate - dont insert and return Ok(false)
// if the change has a dup actor:seq but is different error
pub(crate) fn add_change(&mut self, change: Change) -> Result<bool, AutomergeError> {
if let Some(c) = self.get_change(&change.actor_id, change.seq) {
if &change == c {
return Ok(false);
} else {
return Err(AutomergeError::InvalidChange(
"Invalid reuse of sequence number for actor".to_string(),
));
}
}
let deps = change.dependencies.with(&change.actor_id, change.seq - 1);
let all_deps = self.transitive_deps(&deps);
let actor_id = change.actor_id.clone();
self.history.push(change.clone());
let actor_changes = self
.change_by_actor
.entry(actor_id.clone())
.or_insert_with(Vec::new);
if (change.seq as usize) - 1 != actor_changes.len() {
panic!(
"cant push c={:?}:{:?} at ${:?}",
change.actor_id,
change.seq,
actor_changes.len()
);
}
actor_changes.push(change);
let actor_deps = self.deps_by_actor.entry(actor_id).or_insert_with(Vec::new);
actor_deps.push(all_deps);
// TODO - panic if its the wrong seq!?
Ok(true)
}
}

View file

@ -109,7 +109,7 @@ impl Backend {
}
pub fn history(&self) -> &Vec<Change> {
&self.op_set.history
&self.op_set.states.history
}
pub fn get_patch(&self) -> Patch {
@ -125,19 +125,20 @@ impl Backend {
}
/// Get changes which are in `other` but not in this backend
pub fn get_changes(&self, other: &Backend) -> Result<Vec<Change>,AutomergeError> {
// this should cover > and also concurrent
if ! (self.clock() <= other.clock()) {
return Err(AutomergeError::DivergedState("Cannot diff two states that have diverged".to_string()))
pub fn get_changes<'a>(&self, other: &'a Backend) -> Result<Vec<&'a Change>, AutomergeError> {
if self.clock().divergent(&other.clock()) {
return Err(AutomergeError::DivergedState(
"Cannot diff two states that have diverged".to_string(),
));
}
Ok(other.op_set.get_missing_changes(&self.op_set.clock))
}
pub fn get_changes_for_actor_id(&self, actor_id: ActorID) -> Vec<Change> {
self.op_set.get_changes_for_actor_id(&actor_id)
pub fn get_changes_for_actor_id(&self, actor_id: &ActorID) -> Vec<&Change> {
self.op_set.states.get(actor_id)
}
pub fn get_missing_changes(&self, clock: Clock) -> Vec<Change> {
pub fn get_missing_changes(&self, clock: Clock) -> Vec<&Change> {
self.op_set.get_missing_changes(&clock)
}
@ -146,7 +147,12 @@ impl Backend {
}
pub fn merge(&mut self, remote: &Backend) -> Result<Patch, AutomergeError> {
let missing_changes = remote.get_missing_changes(self.op_set.clock.clone());
let missing_changes = remote
.get_missing_changes(self.op_set.clock.clone())
.iter()
.cloned()
.cloned()
.collect();
self.apply_changes(missing_changes)
}
@ -284,12 +290,8 @@ mod tests {
expected_patch: Patch {
can_undo: false,
can_redo: false,
clock: Clock::empty()
.with(&actor1, 1)
.with(&actor2, 1),
deps: Clock::empty()
.with(&actor1, 1)
.with(&actor2, 1),
clock: Clock::empty().with(&actor1, 1).with(&actor2, 1),
deps: Clock::empty().with(&actor1, 1).with(&actor2, 1),
diffs: vec![Diff {
action: DiffAction::SetMapKey(
ObjectID::Root,

View file

@ -1,4 +1,4 @@
use crate::actor_histories::ActorHistories;
use crate::actor_states::ActorStates;
use crate::error::AutomergeError;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::patch::{Conflict, ElementValue};
@ -57,7 +57,7 @@ impl ConcurrentOperations {
pub(crate) fn incorporate_new_op(
&mut self,
new_op: OperationWithMetadata,
actor_histories: &ActorHistories,
actor_states: &ActorStates,
) -> Result<Vec<Operation>, AutomergeError> {
let previous = self
.operations
@ -75,7 +75,7 @@ impl ConcurrentOperations {
_ => self
.operations
.iter()
.filter(|op| actor_histories.are_concurrent(op, &new_op))
.filter(|op| actor_states.is_concurrent(&op, &new_op))
.cloned()
.collect(),
};
@ -100,7 +100,7 @@ impl ConcurrentOperations {
..
} = op.operation
{
if !(actor_histories.are_concurrent(&new_op, &op_clone)) {
if !(actor_states.is_concurrent(&new_op, &op_clone)) {
*n += inc_value
}
}

View file

@ -7,7 +7,7 @@ macro_rules! log {
}
}
mod actor_histories;
mod actor_states;
mod backend;
mod concurrent_operations;
mod error;
@ -24,7 +24,7 @@ pub use crate::protocol::{
ActorID, Change, ChangeRequest, ChangeRequestType, Clock, DataType, ElementID, Key, ObjectID,
Operation, PrimitiveValue,
};
pub use actor_histories::ActorHistories;
pub use actor_states::ActorStates;
pub use backend::Backend;
pub use concurrent_operations::ConcurrentOperations;
pub use error::AutomergeError;

View file

@ -1,4 +1,4 @@
use crate::actor_histories::ActorHistories;
use crate::actor_states::ActorStates;
use crate::concurrent_operations::ConcurrentOperations;
use crate::error::AutomergeError;
use crate::operation_with_metadata::OperationWithMetadata;
@ -39,15 +39,15 @@ impl ObjectState {
fn handle_assign_op(
&mut self,
op_with_metadata: OperationWithMetadata,
actor_histories: &ActorHistories,
actor_states: &ActorStates,
key: &Key,
) -> Result<(Option<Diff>, Vec<Operation>), AutomergeError> {
let (diff, mut undo_ops) = match self {
ObjectState::Map(mapstate) => {
mapstate.handle_assign_op(op_with_metadata.clone(), actor_histories, key)
mapstate.handle_assign_op(op_with_metadata.clone(), actor_states, key)
}
ObjectState::List(liststate) => {
liststate.handle_assign_op(op_with_metadata.clone(), actor_histories, key)
liststate.handle_assign_op(op_with_metadata.clone(), actor_states, key)
}
}?;
@ -146,7 +146,7 @@ impl ListState {
fn handle_assign_op(
&mut self,
op: OperationWithMetadata,
actor_histories: &ActorHistories,
actor_states: &ActorStates,
key: &Key,
) -> Result<(Option<Diff>, Vec<Operation>), AutomergeError> {
let elem_id = key.as_element_id().map_err(|_| AutomergeError::InvalidChange(format!("Attempted to link, set, delete, or increment an object in a list with invalid element ID {:?}", key.0)))?;
@ -164,7 +164,7 @@ impl ListState {
.operations_by_elemid
.entry(elem_id.clone())
.or_insert_with(ConcurrentOperations::new);
let undo_ops = mutable_ops.incorporate_new_op(op, actor_histories)?;
let undo_ops = mutable_ops.incorporate_new_op(op, actor_states)?;
(undo_ops, mutable_ops.clone())
};
@ -305,17 +305,19 @@ impl MapState {
fn handle_assign_op(
&mut self,
op_with_metadata: OperationWithMetadata,
actor_histories: &ActorHistories,
actor_states: &ActorStates,
key: &Key,
) -> Result<(Option<Diff>, Vec<Operation>), AutomergeError> {
//log!("NEW OP {:?}",op_with_metadata);
let (undo_ops, ops) = {
let mutable_ops = self
.operations_by_key
.entry(key.clone())
.or_insert_with(ConcurrentOperations::new);
let undo_ops = mutable_ops.incorporate_new_op(op_with_metadata, actor_histories)?;
let undo_ops = mutable_ops.incorporate_new_op(op_with_metadata, actor_states)?;
(undo_ops, mutable_ops.clone())
};
//log!("OPS {:?}",ops);
Ok((
Some(
ops.active_op()
@ -439,7 +441,7 @@ impl ObjectStore {
/// later.
pub fn apply_operation(
&mut self,
actor_histories: &ActorHistories,
actor_states: &ActorStates,
op_with_metadata: OperationWithMetadata,
) -> Result<(Option<Diff>, Vec<Operation>), AutomergeError> {
let (diff, undo_ops) = match op_with_metadata.operation {
@ -514,7 +516,7 @@ impl ObjectStore {
.operations_by_object_id
.get_mut(&object_id)
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?;
object.handle_assign_op(op_with_metadata.clone(), actor_histories, key)?
object.handle_assign_op(op_with_metadata.clone(), actor_states, key)?
}
Operation::Insert {
ref list_id,

View file

@ -6,24 +6,17 @@
//! document::state) the implementation fetches the root object ID's history
//! and then recursively walks through the tree of histories constructing the
//! state. Obviously this is not very efficient.
use crate::actor_histories::ActorHistories;
use crate::actor_states::ActorStates;
use crate::concurrent_operations::ConcurrentOperations;
use crate::error::AutomergeError;
use crate::object_store::{ListState, MapState, ObjectState, ObjectStore};
use crate::object_store::ObjectStore;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{Change, Clock, ElementID, Key, ObjectID, Operation, PrimitiveValue};
use crate::value::Value;
use crate::protocol::{Change, Clock, ElementID, ObjectID, Operation};
use crate::{ActorID, Diff, DiffAction};
use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::BuildHasher;
#[derive(Debug, PartialEq, Clone)]
struct ActorState {
change: Change,
all_deps: Clock,
}
/// The OpSet manages an ObjectStore, and a queue of incoming changes in order
/// to ensure that operations are delivered to the object store in causal order
///
@ -40,30 +33,24 @@ struct ActorState {
#[derive(Debug, PartialEq, Clone)]
pub struct OpSet {
pub object_store: ObjectStore,
pub actor_histories: ActorHistories,
queue: Vec<Change>,
pub history: Vec<Change>,
pub clock: Clock,
undo_pos: usize,
pub undo_stack: Vec<Vec<Operation>>,
pub redo_stack: Vec<Vec<Operation>>,
states: HashMap<ActorID, Vec<ActorState>>,
state: Value,
pub states: ActorStates,
}
impl OpSet {
pub fn init() -> OpSet {
OpSet {
object_store: ObjectStore::new(),
actor_histories: ActorHistories::new(),
queue: Vec::new(),
history: Vec::new(),
clock: Clock::empty(),
state: Value::Map(HashMap::new()),
undo_pos: 0,
undo_stack: Vec::new(),
redo_stack: Vec::new(),
states: HashMap::new(),
states: ActorStates::new(),
}
}
@ -157,7 +144,6 @@ impl OpSet {
) -> Result<Vec<Diff>, AutomergeError> {
self.queue.push(change);
let diffs = self.apply_causally_ready_changes(make_undoable)?;
self.state = self.walk(&ObjectID::Root)?;
Ok(diffs)
}
@ -201,41 +187,18 @@ impl OpSet {
// need the undo operation for the creation of the list to achieve
// the undo), so we track newly created objects and only store undo
// operations which don't operate on them.
if let Some(actor_state) = self
.states
.get(&change.actor_id)
.and_then(|changes| changes.get((change.seq as usize) - 1))
{
if change == actor_state.change {
return Ok(Vec::new()); // its a duplicate - ignore
} else {
return Err(AutomergeError::InvalidChange(
"Invalid reuse of sequence number for actor".to_string(),
));
}
}
self.actor_histories.add_change(&change);
self.history.push(change.clone());
let states_for_actor = self
.states
.entry(change.actor_id.clone())
.or_insert_with(Vec::new);
states_for_actor.push(ActorState {
change: change.clone(),
all_deps: self
.actor_histories
.transitive_dependencies(&change.actor_id, change.seq)
.with(&change.actor_id, change.seq - 1),
});
let actor_id = change.actor_id.clone();
let seq = change.seq;
let operations = change.operations.clone();
if !self.states.add_change(change)? {
return Ok(Vec::new()); // its a duplicate - ignore
}
let mut diffs = Vec::new();
let mut undo_operations = Vec::new();
let mut new_object_ids: HashSet<ObjectID> = HashSet::new();
for operation in change.operations {
for operation in operations {
// Store newly created object IDs so we can decide whether we need
// undo ops later
match &operation {
@ -254,7 +217,7 @@ impl OpSet {
};
let (diff, undo_ops_for_this_op) = self
.object_store
.apply_operation(&self.actor_histories, op_with_metadata)?;
.apply_operation(&self.states, op_with_metadata)?;
// If this object is not created in this change then we need to
// store the undo ops for it (if we're storing undo ops at all)
@ -265,9 +228,7 @@ impl OpSet {
diffs.push(d)
}
}
self.clock = self
.clock
.with(&change.actor_id.clone(), change.seq);
self.clock = self.clock.with(&actor_id, seq);
if make_undoable {
let (new_undo_stack_slice, _) = self.undo_stack.split_at(self.undo_pos);
let mut new_undo_stack: Vec<Vec<Operation>> = new_undo_stack_slice.to_vec();
@ -278,110 +239,6 @@ impl OpSet {
Ok(Self::simplify_diffs(diffs))
}
pub fn root_value(&self) -> &Value {
&self.state
}
/// This is where we actually interpret the concurrent operations for each
/// part of the object and construct the current state.
fn walk(&self, object_id: &ObjectID) -> Result<Value, AutomergeError> {
let object_history = self
.object_store
.state_for_object_id(object_id)
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?;
match object_history {
ObjectState::Map(MapState {
operations_by_key, ..
}) => self.interpret_map_ops(operations_by_key),
ObjectState::List(ListState {
operations_by_elemid,
insertions,
following,
..
}) => self.interpret_list_ops(operations_by_elemid, insertions, following),
}
}
fn interpret_map_ops(
&self,
ops_by_key: &HashMap<Key, ConcurrentOperations>,
) -> Result<Value, AutomergeError> {
let mut result: HashMap<String, Value> = HashMap::new();
for (_, ops) in ops_by_key.iter() {
match ops.active_op() {
None => {}
Some(OperationWithMetadata { operation, .. }) => match operation {
Operation::Set {
key: Key(str_key),
value,
..
} => {
result.insert(
str_key.to_string(),
match value {
PrimitiveValue::Null => Value::Null,
PrimitiveValue::Boolean(b) => Value::Boolean(*b),
PrimitiveValue::Number(n) => Value::Number(*n),
PrimitiveValue::Str(s) => Value::Str(s.to_string()),
},
);
}
Operation::Link {
key: Key(str_key),
value,
..
} => {
let linked_value = self.walk(value)?;
result.insert(str_key.to_string(), linked_value);
}
Operation::Increment { .. } => {}
op => {
return Err(AutomergeError::NotImplemented(format!(
"Interpret operation not implemented: {:?}",
op
)))
}
},
}
}
Ok(Value::Map(result))
}
fn interpret_list_ops(
&self,
operations_by_elemid: &HashMap<ElementID, ConcurrentOperations>,
_insertions: &HashMap<ElementID, ElementID>,
following: &HashMap<ElementID, Vec<ElementID>>,
) -> Result<Value, AutomergeError> {
let ops_in_order = list_ops_in_order(operations_by_elemid, following)?;
// Now that we have a list of `ConcurrentOperations` in the correct
// order, we need to interpret each one to construct the value that
// should appear at that position in the resulting sequence.
let result_with_errs =
ops_in_order
.iter()
.filter_map(|(_, ops)| -> Option<Result<Value, AutomergeError>> {
ops.active_op().map(|op| match &op.operation {
Operation::Set { value, .. } => Ok(match value {
PrimitiveValue::Null => Value::Null,
PrimitiveValue::Boolean(b) => Value::Boolean(*b),
PrimitiveValue::Number(n) => Value::Number(*n),
PrimitiveValue::Str(s) => Value::Str(s.to_string()),
}),
Operation::Link { value, .. } => self.walk(&value),
op => Err(AutomergeError::NotImplemented(format!(
"Interpret operation not implemented for list ops: {:?}",
op
))),
})
});
let result = result_with_errs.collect::<Result<Vec<Value>, AutomergeError>>()?;
Ok(Value::List(result))
}
/// Remove any redundant diffs
fn simplify_diffs(diffs: Vec<Diff>) -> Vec<Diff> {
let mut result = Vec::new();
@ -426,23 +283,20 @@ impl OpSet {
}
/// Get all the changes we have that are not in `since`
// TODO: check with martin - this impl seems too simple to be right
pub fn get_missing_changes(&self, since: &Clock) -> Vec<Change> {
self.history.iter().filter(|change| change.seq > since.get(&change.actor_id))
.cloned().collect()
}
pub fn get_changes_for_actor_id(&self, actor_id: &ActorID) -> Vec<Change> {
pub fn get_missing_changes(&self, since: &Clock) -> Vec<&Change> {
self.states
.get(actor_id)
.map(|states| states.iter().map(|s| s.change.clone()).collect())
.unwrap_or_else(Vec::new)
.history
.iter()
.filter(|change| change.seq > since.get(&change.actor_id))
.collect()
}
pub fn get_missing_deps(&self) -> Clock {
// TODO: there's a lot of internal copying going on in here for something kinda simple
self.queue.iter().fold(Clock::empty(), |clock, change| {
clock.union(&change.dependencies).with(&change.actor_id,change.seq - 1)
clock
.union(&change.dependencies)
.with(&change.actor_id, change.seq - 1)
})
}
}

View file

@ -87,7 +87,6 @@ impl ActorID {
pub struct Clock(pub HashMap<ActorID, u32>);
impl Clock {
pub fn empty() -> Clock {
Clock(HashMap::new())
}
@ -112,9 +111,9 @@ impl Clock {
pub fn set(&mut self, actor_id: &ActorID, seq: u32) {
if seq == 0 {
self.0.remove(actor_id);
self.0.remove(actor_id);
} else {
self.0.insert(actor_id.clone(), seq);
self.0.insert(actor_id.clone(), seq);
}
}
@ -122,23 +121,25 @@ impl Clock {
*self.0.get(actor_id).unwrap_or(&0)
}
pub fn divergent(&self, other: &Clock) -> bool {
!self.less_or_equal(other)
}
fn less_or_equal(&self, other: &Clock) -> bool {
self.into_iter().all(|(actor_id, _)| {
self.get(actor_id) <= other.get(actor_id)
})
self.into_iter()
.all(|(actor_id, _)| self.get(actor_id) <= other.get(actor_id))
}
}
// TODO : check with martin
impl PartialOrd for Clock {
fn partial_cmp(&self, other: &Clock) -> Option<Ordering> {
let le1 = self.less_or_equal(other);
let le2 = other.less_or_equal(self);
match (le1,le2) {
(true, true) => Some(Ordering::Equal),
(true, false) => Some(Ordering::Less),
(false, true) => Some(Ordering::Greater),
(false, false) => None,
match (le1, le2) {
(true, true) => Some(Ordering::Equal),
(true, false) => Some(Ordering::Less),
(false, true) => Some(Ordering::Greater),
(false, false) => None,
}
}
}

View file

@ -4,7 +4,7 @@
use crate::change_request::{ChangeRequest, ListIndex, Path, PathElement};
use crate::error::InvalidChangeRequest;
use automerge_backend::list_ops_in_order;
use automerge_backend::ActorHistories;
use automerge_backend::ActorStates;
use automerge_backend::ListState;
use automerge_backend::MapState;
use automerge_backend::ObjectState;
@ -188,7 +188,7 @@ struct InsertAfterTarget {
pub struct ChangeContext<'a> {
object_store: ObjectStore,
actor_id: ActorID,
actor_histories: &'a ActorHistories,
states: &'a ActorStates,
clock: Clock,
}
@ -196,12 +196,12 @@ impl<'a> ChangeContext<'a> {
pub fn new(
object_store: &ObjectStore,
actor_id: ActorID,
actor_histories: &'a ActorHistories,
states: &'a ActorStates,
clock: Clock,
) -> ChangeContext<'a> {
ChangeContext {
object_store: object_store.clone(),
actor_histories,
states,
actor_id,
clock,
}
@ -250,12 +250,12 @@ impl<'a> ChangeContext<'a> {
ops.iter().for_each(|inner_ops| {
inner_ops.iter().for_each(|op| {
let op_with_meta = OperationWithMetadata {
sequence: self.clock.seq_for(&self.actor_id) + 1,
sequence: self.clock.get(&self.actor_id) + 1,
actor_id: self.actor_id.clone(),
operation: op.clone(),
};
self.object_store
.apply_operation(self.actor_histories, op_with_meta)
.apply_operation(self.states, op_with_meta)
.unwrap();
});
});
@ -267,7 +267,7 @@ impl<'a> ChangeContext<'a> {
.collect::<Result<Vec<Vec<Operation>>, InvalidChangeRequest>>()?;
let ops = nested_ops.into_iter().flatten().collect::<Vec<Operation>>();
let dependencies = self.clock.clone();
let seq = self.clock.seq_for(&self.actor_id) + 1;
let seq = self.clock.get(&self.actor_id) + 1;
let change = Change {
actor_id: self.actor_id.clone(),
operations: ops,

View file

@ -31,7 +31,7 @@ impl Document {
/// Get the current state of the document as a serde_json value
pub fn state(&self) -> &Value {
self.op_set.root_value()
panic!("not implemented");
}
/// Add a single change to the document
@ -47,7 +47,7 @@ impl Document {
let mut change_ctx = ChangeContext::new(
&self.op_set.object_store,
self.actor_id.clone(),
&self.op_set.actor_histories,
&self.op_set.states,
self.op_set.clock.clone(),
);
let change = change_ctx.create_change(requests, message)?;
@ -62,320 +62,7 @@ mod tests {
use super::*;
use crate::change_request::{ListIndex, Path};
use automerge_backend::Value;
use automerge_backend::{
ActorID, Clock, DataType, ElementID, Key, ObjectID, Operation, PrimitiveValue,
};
use serde_json;
use std::collections::HashMap;
#[test]
fn test_loading_from_changes() {
let mut actor1_deps = HashMap::new();
actor1_deps.insert(ActorID("id1".to_string()), 1);
let changes = vec![
Change {
actor_id: ActorID("id1".to_string()),
operations: vec![
Operation::MakeMap {
object_id: ObjectID::ID("2ce778e4-d23f-426f-98d7-e97fea47181c".to_string()),
},
Operation::Link {
object_id: ObjectID::Root,
key: Key("cards_by_id".to_string()),
value: ObjectID::ID("2ce778e4-d23f-426f-98d7-e97fea47181c".to_string()),
},
Operation::Set {
object_id: ObjectID::Root,
key: Key("numRounds".to_string()),
value: PrimitiveValue::Number(0.0),
datatype: Some(DataType::Counter),
},
Operation::Set {
object_id: ObjectID::Root,
key: Key("size_of_cards".to_string()),
value: PrimitiveValue::Number(10.0),
datatype: None,
},
Operation::Set {
object_id: ObjectID::ID("2ce778e4-d23f-426f-98d7-e97fea47181c".to_string()),
key: Key("deleted_key".to_string()),
value: PrimitiveValue::Boolean(false),
datatype: None,
},
Operation::Delete {
object_id: ObjectID::ID("2ce778e4-d23f-426f-98d7-e97fea47181c".to_string()),
key: Key("deleted_key".to_string()),
},
Operation::MakeList {
object_id: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
},
Operation::Link {
object_id: ObjectID::Root,
key: Key("cards".to_string()),
value: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
},
Operation::Insert {
list_id: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
key: ElementID::Head,
elem: 1,
},
Operation::Set {
object_id: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
key: Key("id1:1".to_string()),
value: PrimitiveValue::Number(1.0),
datatype: None,
},
Operation::Insert {
list_id: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
key: ElementID::SpecificElementID(ActorID("id1".to_string()), 1),
elem: 2,
},
Operation::Set {
object_id: ObjectID::ID("87cef98c-246d-42b8-ada5-28524f5aefb3".to_string()),
key: Key("id1:2".to_string()),
value: PrimitiveValue::Boolean(false),
datatype: None,
},
],
seq: 1,
message: Some("initialization".to_string()),
dependencies: Clock::empty(),
},
Change {
actor_id: ActorID("id1".to_string()),
operations: vec![
Operation::Increment {
object_id: ObjectID::Root,
key: Key("numRounds".to_string()),
value: 5.0,
},
Operation::Set {
object_id: ObjectID::Root,
key: Key("size_of_cards".to_string()),
value: PrimitiveValue::Number(12.0),
datatype: None,
},
],
seq: 2,
message: Some("incrementation".to_string()),
dependencies: Clock(actor1_deps.clone()),
},
Change {
actor_id: ActorID("id2".to_string()),
operations: vec![
Operation::Increment {
object_id: ObjectID::Root,
key: Key("numRounds".to_string()),
value: 6.0,
},
Operation::Set {
object_id: ObjectID::Root,
key: Key("size_of_cards".to_string()),
value: PrimitiveValue::Number(13.0),
datatype: None,
},
],
seq: 1,
message: Some("actor 2 incrementation".to_string()),
dependencies: Clock(actor1_deps),
},
];
let doc = Document::load(changes).unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
let actual_state = doc.state().to_json();
assert_eq!(actual_state, expected)
}
#[test]
fn test_set_mutation() {
let mut doc = Document::init();
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("Some change".to_string()),
vec![ChangeRequest::Set {
path: Path::root().key("the-state".to_string()),
value: Value::from_json(&json_value),
}],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"the-state": {
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
doc.create_and_apply_change(
Some("another change".to_string()),
vec![ChangeRequest::Set {
path: Path::root()
.key("the-state".to_string())
.key("size_of_cards".to_string()),
value: Value::from_json(&serde_json::Value::Number(
serde_json::Number::from_f64(10.0).unwrap(),
)),
}],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"the-state": {
"cards_by_id": {},
"size_of_cards": 10.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
fn test_move_ops() {
let mut doc = Document::init();
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jack": {"value": 11}
},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("Init".to_string()),
vec![ChangeRequest::Set {
path: Path::root(),
value: Value::from_json(&json_value),
}],
)
.unwrap();
println!("Doc state: {:?}", doc.state().to_json());
doc.create_and_apply_change(
Some("Move jack".to_string()),
vec![
ChangeRequest::Move {
from: Path::root()
.key("cards_by_id".to_string())
.key("jack".to_string()),
to: Path::root()
.key("cards_by_id".to_string())
.key("jill".to_string()),
},
ChangeRequest::Move {
from: Path::root().key("size_of_cards".to_string()),
to: Path::root().key("number_of_cards".to_string()),
},
],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jill": {"value": 11.0}
},
"number_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
fn test_delete_op() {
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jack": {"value": 11}
},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
let mut doc = Document::init();
doc.create_and_apply_change(
Some("Init".to_string()),
vec![ChangeRequest::Set {
path: Path::root(),
value: Value::from_json(&json_value),
}],
)
.unwrap();
doc.create_and_apply_change(
Some("Delete everything".to_string()),
vec![
ChangeRequest::Delete {
path: Path::root()
.key("cards_by_id".to_string())
.key("jack".to_string()),
},
ChangeRequest::Delete {
path: Path::root()
.key("cards".to_string())
.index(ListIndex::Index(1)),
},
],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0]
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
#[ignore] // This is broken for some reason

View file

@ -1,294 +1,2 @@
extern crate automerge;
use automerge::{Change, Document};
#[test]
fn test_concurrent_ops() {
let changes1: Vec<Change> = serde_json::from_str(
r#"
[
{
"ops": [
{
"action": "makeList",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5"
},
{
"action": "link",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "cards",
"value": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5"
},
{
"action": "makeMap",
"obj": "a092dea1-6fa5-4459-91d4-f7aebf0c0a77"
},
{
"action": "link",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "cards_by_id",
"value": "a092dea1-6fa5-4459-91d4-f7aebf0c0a77"
},
{
"action": "set",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "numRounds",
"value": 0,
"datatype": "counter"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 1,
"deps": {},
"message": "Initialization"
},
{
"ops": [
{
"action": "ins",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "_head",
"elem": 1
},
{
"action": "makeMap",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c"
},
{
"action": "set",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c",
"key": "title",
"value": "Rewrite everything in clojure"
},
{
"action": "set",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c",
"key": "done",
"value": false
},
{
"action": "link",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:1",
"value": "003000cf-2d2d-4d37-9fb0-10f8ec70975c"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 2,
"deps": {},
"message": "Add card"
},
{
"ops": [
{
"action": "ins",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:1",
"elem": 2
},
{
"action": "makeMap",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d"
},
{
"action": "set",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d",
"key": "title",
"value": "concurrent op 1"
},
{
"action": "set",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d",
"key": "done",
"value": false
},
{
"action": "link",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:2",
"value": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 3,
"deps": {},
"message": "concurrently add card (op 1)"
}
]
"#,
)
.unwrap();
let changes2: Vec<Change> = serde_json::from_str(
r#"
[
{
"ops": [
{
"action": "makeList",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5"
},
{
"action": "link",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "cards",
"value": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5"
},
{
"action": "makeMap",
"obj": "a092dea1-6fa5-4459-91d4-f7aebf0c0a77"
},
{
"action": "link",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "cards_by_id",
"value": "a092dea1-6fa5-4459-91d4-f7aebf0c0a77"
},
{
"action": "set",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "numRounds",
"value": 0,
"datatype": "counter"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 1,
"deps": {},
"message": "Initialization"
},
{
"ops": [
{
"action": "ins",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "_head",
"elem": 1
},
{
"action": "makeMap",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c"
},
{
"action": "set",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c",
"key": "title",
"value": "Rewrite everything in clojure"
},
{
"action": "set",
"obj": "003000cf-2d2d-4d37-9fb0-10f8ec70975c",
"key": "done",
"value": false
},
{
"action": "link",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:1",
"value": "003000cf-2d2d-4d37-9fb0-10f8ec70975c"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 2,
"deps": {},
"message": "Add card"
},
{
"ops": [
{
"action": "ins",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:1",
"elem": 2
},
{
"action": "makeMap",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d"
},
{
"action": "set",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d",
"key": "title",
"value": "concurrent op 1"
},
{
"action": "set",
"obj": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d",
"key": "done",
"value": false
},
{
"action": "link",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:2",
"value": "21ca2b86-e9a5-4a7f-9cf5-3a7112d3948d"
}
],
"actor": "fc6c6433-296a-4e7d-983b-589cde8b78ef",
"seq": 3,
"deps": {},
"message": "concurrently add card (op 1)"
},
{
"ops": [
{
"action": "ins",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "fc6c6433-296a-4e7d-983b-589cde8b78ef:2",
"elem": 3
},
{
"action": "makeMap",
"obj": "3c5e415e-392d-4bd8-8fee-4f75a78d38e4"
},
{
"action": "set",
"obj": "3c5e415e-392d-4bd8-8fee-4f75a78d38e4",
"key": "title",
"value": "concurrent op 2"
},
{
"action": "set",
"obj": "3c5e415e-392d-4bd8-8fee-4f75a78d38e4",
"key": "done",
"value": false
},
{
"action": "link",
"obj": "79a4d939-09e9-4dc9-a4c6-0bffb98ee0d5",
"key": "e3b27fb8-574f-43c2-94eb-d41a22c8b30c:3",
"value": "3c5e415e-392d-4bd8-8fee-4f75a78d38e4"
}
],
"actor": "e3b27fb8-574f-43c2-94eb-d41a22c8b30c",
"seq": 1,
"deps": {
"fc6c6433-296a-4e7d-983b-589cde8b78ef": 3
},
"message": "concurrently add card (op 2)"
}
]
"#,
)
.unwrap();
let mut doc = Document::load(changes1).unwrap();
for change in changes2 {
doc.apply_change(change).unwrap()
}
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"numRounds": 0.0,
"cards": [
{"title": "Rewrite everything in clojure", "done": false},
{"title": "concurrent op 1", "done": false},
{"title": "concurrent op 2", "done": false}
]
}
"#,
)
.unwrap();
let actual = doc.state().to_json();
assert_eq!(expected, actual);
}

View file

@ -1,139 +1,2 @@
extern crate automerge;
use automerge::{Change, Document};
#[test]
fn test_table_column_order() {
let changes1: Vec<Change> = serde_json::from_str(
r#"
[
{
"ops": [
{
"action": "makeTable",
"obj": "a9de13ee-9b2f-43f6-b167-12823931245b"
},
{
"action": "makeList",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb"
},
{
"action": "ins",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "_head",
"elem": 1
},
{
"action": "set",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:1",
"value": "authors"
},
{
"action": "ins",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:1",
"elem": 2
},
{
"action": "set",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:2",
"value": "title"
},
{
"action": "ins",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:2",
"elem": 3
},
{
"action": "set",
"obj": "de41fdb3-fdf9-4146-a1d3-7049c983aacb",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:3",
"value": "isbn"
},
{
"action": "link",
"obj": "a9de13ee-9b2f-43f6-b167-12823931245b",
"key": "columns",
"value": "de41fdb3-fdf9-4146-a1d3-7049c983aacb"
},
{
"action": "link",
"obj": "00000000-0000-0000-0000-000000000000",
"key": "books",
"value": "a9de13ee-9b2f-43f6-b167-12823931245b"
},
{
"action": "makeMap",
"obj": "b822bb61-1046-4faf-8719-ef479f4b6ca5"
},
{
"action": "makeList",
"obj": "8fbabf41-64e5-41e8-b82b-b23c668f8f51"
},
{
"action": "ins",
"obj": "8fbabf41-64e5-41e8-b82b-b23c668f8f51",
"key": "_head",
"elem": 1
},
{
"action": "set",
"obj": "8fbabf41-64e5-41e8-b82b-b23c668f8f51",
"key": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07:1",
"value": "Kleppmann, Martin"
},
{
"action": "link",
"obj": "b822bb61-1046-4faf-8719-ef479f4b6ca5",
"key": "authors",
"value": "8fbabf41-64e5-41e8-b82b-b23c668f8f51"
},
{
"action": "set",
"obj": "b822bb61-1046-4faf-8719-ef479f4b6ca5",
"key": "title",
"value": "Designing Data-Intensive Applications"
},
{
"action": "set",
"obj": "b822bb61-1046-4faf-8719-ef479f4b6ca5",
"key": "isbn",
"value": "1449373321"
},
{
"action": "link",
"obj": "a9de13ee-9b2f-43f6-b167-12823931245b",
"key": "b822bb61-1046-4faf-8719-ef479f4b6ca5",
"value": "b822bb61-1046-4faf-8719-ef479f4b6ca5"
}
],
"actor": "c01d1a3b-2abe-481b-994f-3f37aa4fbb07",
"seq": 1,
"deps": {}
}
]
"#,
)
.unwrap();
let doc = Document::load(changes1).unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"books": {
"columns": ["authors", "title", "isbn"],
"b822bb61-1046-4faf-8719-ef479f4b6ca5": {
"authors": ["Kleppmann, Martin"],
"isbn": "1449373321",
"title": "Designing Data-Intensive Applications"
}
}
}
"#,
)
.unwrap();
let actual = doc.state().to_json();
assert_eq!(expected, actual);
}