Merge branch 'mutation'

This commit is contained in:
Alex Good 2020-02-13 00:06:25 +00:00
commit a286bbad8d
16 changed files with 1589 additions and 359 deletions

View file

@ -5,7 +5,7 @@ rust:
- nightly
cache: cargo
before_script:
- rustup component add clippy
- rustup component add clippy --toolchain=nightly || cargo install --git https://github.com/rust-lang/rust-clippy/ --force clippy #clippy is sometimes not availble for nightly
- rustup component add rustfmt
script:
- cargo fmt -- --check

View file

@ -16,3 +16,4 @@ name = "automerge"
[dependencies]
serde = { version = "^1.0", features=["derive"] }
serde_json = "^1.0"
uuid = { version = "^0.5.1", features=["v4"] }

View file

@ -1,26 +1,32 @@
# Automerge
<<<<<<< HEAD
[![docs](https://docs.rs/automerge/badge.svg)](docs.rs/automerge)
[![crates](https://img.shields.io/crates/v/automerge)](https://crates.io/crates/automerge)
=======
[![docs](https://docs.rs/automerge/badge.svg)](https://docs.rs/automerge)
[![crates](https://crates.io/crates/automerge)](https://crates.io/crates/automerge)
>>>>>>> mutation
[![Build Status](https://travis-ci.org/alexjg/automerge-rs.svg?branch=master)](https://travis-ci.org/alexjg/automerge-rs)
This is a very early, very much work in progress implementation of [automerge](https://github.com/automerge/automerge) in rust. At the moment it barely implements a read only view of operations received, with very little testing that it works. Objectives for it are:
This is a very early, very much work in progress implementation of [automerge](https://github.com/automerge/automerge) in rust. At the moment it implements a simple interface for reading the state of an OpSet, and a really horrendous interface for generating new changes to the Opset.
- Full read and write replication
- `no_std` support to make it easy to use in WASM environments
- Model based testing to ensure compatibility with the JS library
## Plans
We're tentatively working on a plan to write a backend for the current javascript implementation of Automerge in Rust. The javascript Automerge library is split into two parts, a "frontend" and a "backend". The "backend" contains a lot of the more complex logic of the CRDT and also has a fairly small API. Given these facts we think we might be able to write a rust implementation of the backend, which compiles to WASM and can be used as a drop in replacement for the current backend. This same rust implementation could also be used via FFI on a lot of other platforms, which would make language interop much easier. This is all early days but it's very exciting.
For now though, it's a mostly broken pure rust implementation
## How to use
Add this to your dependencies
```
automerge = 0.0.2
automerge = "0.0.2"
```
You'll need to export changes from automerge as JSON rather than using the encoding that `Automerge.save` uses. So first do this:
You'll need to export changes from automerge as JSON rather than using the encoding that `Automerge.save` uses. So first do this (in javascript):
```javascript
const doc = <your automerge document>
@ -41,3 +47,34 @@ fn main() {
println!("{:?}", state);
}
```
You can create new changes to the document by doing things like this:
```rust,no_run
extern crate automerge;
fn main() {
let mut doc = Document::init();
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("Some change".to_string()),
vec![ChangeRequest::Set {
path: Path::root().key("the-state".to_string()),
value: Value::from_json(&json_value),
}],
)
.unwrap();
}
```
Check the docs on `ChangeRequest` for more information on what you can do.

69
src/actor_histories.rs Normal file
View file

@ -0,0 +1,69 @@
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{ActorID, Change, Clock};
use std::collections::HashMap;
/// ActorHistories is a cache for the transitive dependencies of each change
/// received from each actor. This is necessary because a change only ships its
/// direct dependencies in `deps` but we need all dependencies to determine
/// whether two operations occurrred concurrently.
#[derive(Debug)]
pub struct ActorHistories(HashMap<ActorID, HashMap<u32, Clock>>);
impl ActorHistories {
pub(crate) fn new() -> ActorHistories {
ActorHistories(HashMap::new())
}
/// Return the latest sequence required by `op` for actor `actor`
fn dependency_for(&self, op: &OperationWithMetadata, actor: &ActorID) -> u32 {
self.0
.get(&op.actor_id)
.and_then(|clocks| clocks.get(&op.sequence))
.map(|c| c.seq_for(actor))
.unwrap_or(0)
}
/// Whether or not `change` is already part of this `ActorHistories`
pub(crate) fn is_applied(&self, change: &Change) -> bool {
self.0
.get(&change.actor_id)
.and_then(|clocks| clocks.get(&change.seq))
.map(|c| c.seq_for(&change.actor_id) >= change.seq)
.unwrap_or(false)
}
/// Update this ActorHistories to include the changes in `change`
pub(crate) fn add_change(&mut self, change: &Change) {
let change_deps = change
.dependencies
.with_dependency(&change.actor_id, change.seq - 1);
let transitive = self.transitive_dependencies(&change.actor_id, change.seq);
let all_deps = transitive.upper_bound(&change_deps);
let state = self
.0
.entry(change.actor_id.clone())
.or_insert_with(HashMap::new);
state.insert(change.seq, all_deps);
}
fn transitive_dependencies(&mut self, actor_id: &ActorID, seq: u32) -> Clock {
self.0
.get(actor_id)
.and_then(|deps| deps.get(&seq))
.cloned()
.unwrap_or_else(Clock::empty)
}
/// Whether the two operations in question are concurrent
pub(crate) fn are_concurrent(
&self,
op1: &OperationWithMetadata,
op2: &OperationWithMetadata,
) -> bool {
if op1.sequence == op2.sequence && op1.actor_id == op2.actor_id {
return false;
}
self.dependency_for(op1, &op2.actor_id) < op2.sequence
&& self.dependency_for(op2, &op1.actor_id) < op1.sequence
}
}

644
src/change_context.rs Normal file
View file

@ -0,0 +1,644 @@
use crate::actor_histories::ActorHistories;
/// This module handles creating changes. Most of the machinery here is related
/// to resolving paths from ChangeRequests, and generating operations to create
/// and modify data in the op set.
use crate::change_request::{ChangeRequest, ListIndex, Path, PathElement};
use crate::error::InvalidChangeRequest;
use crate::object_store::ObjectHistory;
use crate::object_store::ObjectStore;
use crate::op_set::list_ops_in_order;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{
ActorID, Change, Clock, ElementID, Key, ObjectID, Operation, PrimitiveValue,
};
use crate::value::Value;
use std::convert::TryInto;
#[derive(Clone, Debug)]
enum ResolvedPathElement {
Map(ObjectID),
List(ObjectID, u32),
Key(Key),
Index(ElementID),
Value(PrimitiveValue),
MissingKey(Key),
}
/// Represents a resolved path
#[derive(Debug, Clone)]
struct ResolvedPath(Vec<ResolvedPathElement>);
impl ResolvedPath {
fn new(elements: Vec<ResolvedPathElement>) -> ResolvedPath {
ResolvedPath(elements)
}
fn as_set_target(&self) -> Option<SetTarget> {
self.last_n(3).and_then(|last_three| {
match &last_three[..] {
[ResolvedPathElement::Map(o), ResolvedPathElement::Key(k), ResolvedPathElement::Value(_)] => Some(SetTarget{
containing_object_id: o.clone(),
key: k.clone(),
}),
[ResolvedPathElement::Map(o), ResolvedPathElement::Key(k), ResolvedPathElement::MissingKey(_)] => Some(SetTarget{
containing_object_id: o.clone(),
key: k.clone(),
}),
[ResolvedPathElement::List(l, _), ResolvedPathElement::Index(elem_id), ResolvedPathElement::Value(_)] => Some(SetTarget{
containing_object_id: l.clone(),
key: elem_id.as_key(),
}),
_ => None
}
})
}
fn as_move_source(&self) -> Option<MoveSource> {
self.last_n(3).and_then(|last_three| {
match &last_three[..] {
[ResolvedPathElement::Map(o), ResolvedPathElement::Key(k), ResolvedPathElement::Map(c)] => Some(MoveSource::Reference{
containing_object_id: o.clone(),
key: k.clone(),
contained_object_id: c.clone()
}),
[ResolvedPathElement::Map(o), ResolvedPathElement::Key(k), ResolvedPathElement::List(l, _)] => Some(MoveSource::Reference{
containing_object_id: o.clone(),
key: k.clone(),
contained_object_id: l.clone()
}),
[ResolvedPathElement::Map(o), ResolvedPathElement::Key(k), ResolvedPathElement::Value(v)] => Some(MoveSource::Value{
containing_object_id: o.clone(),
value: v.clone(),
key: k.clone(),
}),
[ResolvedPathElement::List(l, _), ResolvedPathElement::Index(elem_id), ResolvedPathElement::Map(m)] => Some(MoveSource::Reference{
containing_object_id: l.clone(),
key: elem_id.as_key(),
contained_object_id: m.clone(),
}),
[ResolvedPathElement::List(l, _), ResolvedPathElement::Index(elem_id), ResolvedPathElement::List(l2, _)] => Some(MoveSource::Reference{
containing_object_id: l.clone(),
key: elem_id.as_key(),
contained_object_id: l2.clone(),
}),
[ResolvedPathElement::List(l, _), ResolvedPathElement::Index(i), ResolvedPathElement::Value(v)] => Some(MoveSource::Value{
containing_object_id: l.clone(),
value: v.clone(),
key: i.as_key(),
}),
_ => None
}
})
}
fn as_insert_after_target(&self) -> Option<InsertAfterTarget> {
self.last_n(3).and_then(|last_three| {
match &last_three[..] {
[ResolvedPathElement::List(l, m), ResolvedPathElement::Index(e), ResolvedPathElement::Value(_)] => Some(InsertAfterTarget{
list_id: l.clone(),
element_id: e.clone(),
max_elem: *m,
}),
[_, ResolvedPathElement::List(l, m), ResolvedPathElement::Index(e)] => Some(InsertAfterTarget{
list_id: l.clone(),
element_id: e.clone(),
max_elem: *m,
}),
_ => None,
}
})
}
fn last_n(&self, n: usize) -> Option<Box<[ResolvedPathElement]>> {
if self.0.len() < n {
None
} else {
Some(
self.0
.iter()
.skip(self.0.len() - n)
.cloned()
.collect::<Vec<ResolvedPathElement>>()
.into_boxed_slice(),
)
}
}
}
/// Represents the target of a "set" change request.
#[derive(Debug, Clone)]
struct SetTarget {
containing_object_id: ObjectID,
key: Key,
}
/// Represents a path which can be moved.
enum MoveSource {
Reference {
containing_object_id: ObjectID,
key: Key,
contained_object_id: ObjectID,
},
Value {
containing_object_id: ObjectID,
key: Key,
value: PrimitiveValue,
},
}
impl MoveSource {
fn delete_op(&self) -> Operation {
match self {
MoveSource::Reference {
containing_object_id,
key,
..
}
| MoveSource::Value {
containing_object_id,
key,
..
} => Operation::Delete {
object_id: containing_object_id.clone(),
key: key.clone(),
},
}
}
}
#[derive(Debug)]
struct InsertAfterTarget {
list_id: ObjectID,
element_id: ElementID,
max_elem: u32,
}
/// The ChangeContext is responsible for taking the current state of the opset
/// (which is an ObjectStore, and a clock), and an actor ID and generating a
/// new change for a given set of ChangeRequests. The ObjectStore which the
/// ChangeContext manages is a copy of the OpSet's ObjectStore, this is because
/// in order to process ChangeRequests the ChangeContext needs to update the
/// ObjectStore.
///
/// For example, if we have several ChangeRequests which are inserting elements
/// into a list, one after another, then we need to know the element IDs of the
/// newly inserted elements to generate the correct operations.
pub struct ChangeContext<'a> {
object_store: ObjectStore,
actor_id: ActorID,
actor_histories: &'a ActorHistories,
clock: Clock,
}
impl<'a> ChangeContext<'a> {
pub fn new(
object_store: &ObjectStore,
actor_id: ActorID,
actor_histories: &'a ActorHistories,
clock: Clock,
) -> ChangeContext<'a> {
ChangeContext {
object_store: object_store.clone(),
actor_histories,
actor_id,
clock,
}
}
fn get_operations_for_object_id(&self, object_id: &ObjectID) -> Option<&ObjectHistory> {
self.object_store.history_for_object_id(object_id)
}
pub(crate) fn create_change<I>(
&mut self,
requests: I,
message: Option<String>,
) -> Result<Change, InvalidChangeRequest>
where
I: IntoIterator<Item = ChangeRequest>,
{
let ops_with_errors: Vec<Result<Vec<Operation>, InvalidChangeRequest>> = requests
.into_iter()
.map(|request| {
let ops = match request {
ChangeRequest::Set {
ref path,
ref value,
} => self.create_set_operations(&self.actor_id, path, value),
ChangeRequest::Delete { ref path } => {
self.create_delete_operation(path).map(|o| vec![o])
}
ChangeRequest::Increment {
ref path,
ref value,
} => self
.create_increment_operation(path, value.clone())
.map(|o| vec![o]),
ChangeRequest::Move { ref from, ref to } => {
self.create_move_operations(from, to)
}
ChangeRequest::InsertAfter {
ref path,
ref value,
} => self.create_insert_operation(&self.actor_id, path, value),
};
// We have to apply each operation to the object store so that
// operations which reference earlier operations within this
// change set have the correct data to refer to.
ops.iter().for_each(|inner_ops| {
inner_ops.iter().for_each(|op| {
let op_with_meta = OperationWithMetadata {
sequence: self.clock.seq_for(&self.actor_id) + 1,
actor_id: self.actor_id.clone(),
operation: op.clone(),
};
self.object_store
.apply_operation(self.actor_histories, op_with_meta)
.unwrap();
});
});
ops
})
.collect();
let nested_ops = ops_with_errors
.into_iter()
.collect::<Result<Vec<Vec<Operation>>, InvalidChangeRequest>>()?;
let ops = nested_ops.into_iter().flatten().collect::<Vec<Operation>>();
let dependencies = self.clock.clone();
let seq = self.clock.seq_for(&self.actor_id) + 1;
let change = Change {
actor_id: self.actor_id.clone(),
operations: ops,
seq,
message,
dependencies,
};
Ok(change)
}
pub(crate) fn create_set_operations(
&self,
actor_id: &ActorID,
path: &Path,
value: &Value,
) -> Result<Vec<Operation>, InvalidChangeRequest> {
// If we're setting a map as the root object we actually want to set
// each key of the map to the corresponding key in the root object
if let Value::Map(kvs) = value.clone() {
if path.is_root() {
let mut ops = Vec::new();
for (key, value) in kvs.into_iter() {
let key_path = path.key(key);
let mut this_key_ops =
self.create_set_operations(actor_id, &key_path, &value)?;
ops.append(&mut this_key_ops)
}
return Ok(ops);
}
};
self.resolve_path(path)
.and_then(|r| r.as_set_target())
.map(|path_resolution| match value {
Value::Map { .. } | Value::List { .. } => {
let (new_object_id, mut create_ops) = value_to_ops(actor_id, &value);
let link_op = Operation::Link {
object_id: path_resolution.containing_object_id.clone(),
key: path_resolution.key.clone(),
value: new_object_id,
};
create_ops.push(link_op);
create_ops
}
Value::Str { .. } | Value::Number { .. } | Value::Boolean { .. } | Value::Null => {
vec![create_prim(
path_resolution.containing_object_id.clone(),
path_resolution.key,
&value,
)]
}
})
.ok_or(InvalidChangeRequest(format!("Missing path: {:?}", path)))
}
pub(crate) fn create_move_operations(
&self,
from: &Path,
to: &Path,
) -> Result<Vec<Operation>, InvalidChangeRequest> {
let resolved_from = self.resolve_path(from).ok_or(InvalidChangeRequest(format!(
"Missing from path: {:?}",
from
)))?;
let resolved_to = self
.resolve_path(to)
.ok_or(InvalidChangeRequest(format!("Missing to path: {:?}", to)))?;
let move_source = resolved_from
.as_move_source()
.ok_or(InvalidChangeRequest(format!(
"Invalid move source path: {:?}",
from
)))?;
let target = resolved_to
.as_set_target()
.ok_or(InvalidChangeRequest(format!("Invalid to path: {:?}", to)))?;
let delete_op = move_source.delete_op();
let insert_op = match (move_source, target) {
(
MoveSource::Value { value: v, .. },
SetTarget {
containing_object_id,
key,
},
) => Operation::Set {
object_id: containing_object_id,
key,
value: v,
datatype: None,
},
(
MoveSource::Reference {
contained_object_id,
..
},
SetTarget {
containing_object_id: target_container_id,
key: target_key,
},
) => Operation::Link {
object_id: target_container_id,
key: target_key,
value: contained_object_id,
},
};
Ok(vec![delete_op, insert_op])
}
pub(crate) fn create_delete_operation(
&self,
path: &Path,
) -> Result<Operation, InvalidChangeRequest> {
self.resolve_path(path)
.and_then(|r| r.as_move_source())
.map(|source| source.delete_op())
.ok_or(InvalidChangeRequest(format!(
"Invalid delete path: {:?}",
path
)))
}
pub(crate) fn create_increment_operation(
&self,
_path: &Path,
_value: f64,
) -> Result<Operation, InvalidChangeRequest> {
Err(InvalidChangeRequest(
"create_increment_operation not implemented".to_string(),
))
}
pub(crate) fn create_insert_operation(
&self,
actor_id: &ActorID,
after: &Path,
value: &Value,
) -> Result<Vec<Operation>, InvalidChangeRequest> {
let after_target = self
.resolve_path(after)
.and_then(|p| p.as_insert_after_target())
.ok_or(InvalidChangeRequest(format!(
"Invalid insert after path: {:?}",
after
)))?;
let next_elem_id =
ElementID::SpecificElementID(actor_id.clone(), after_target.max_elem + 1);
let insert_op = Operation::Insert {
list_id: after_target.list_id.clone(),
key: after_target.element_id,
elem: after_target.max_elem + 1,
};
let mut ops = vec![insert_op];
match value {
Value::Map { .. } | Value::List { .. } => {
let (new_object_id, create_ops) = value_to_ops(actor_id, &value);
ops.extend(create_ops);
let link_op = Operation::Link {
object_id: after_target.list_id.clone(),
key: next_elem_id.as_key(),
value: new_object_id,
};
ops.push(link_op);
}
Value::Str { .. } | Value::Number { .. } | Value::Boolean { .. } | Value::Null => {
ops.push(create_prim(
after_target.list_id.clone(),
next_elem_id.as_key(),
&value,
));
}
};
Ok(ops)
}
fn resolve_path(&self, path: &Path) -> Option<ResolvedPath> {
let mut resolved_elements: Vec<ResolvedPathElement> = Vec::new();
let mut containing_object_id = ObjectID::Root;
for next_elem in path {
match resolved_elements.last() {
Some(ResolvedPathElement::MissingKey(_)) => return None,
Some(ResolvedPathElement::Index(ElementID::Head)) => return None,
_ => {}
}
match next_elem {
PathElement::Root => {
resolved_elements.push(ResolvedPathElement::Map(ObjectID::Root))
}
PathElement::Key(key) => {
resolved_elements.push(ResolvedPathElement::Key(Key(key.to_string())));
let op = self
.get_operations_for_object_id(&containing_object_id)
.and_then(|history| match history {
ObjectHistory::Map { operations_by_key } => Some(operations_by_key),
ObjectHistory::List { .. } => None,
})
.and_then(|kvs| kvs.get(key))
.and_then(|cops| cops.active_op())
.map(|o| o.operation.clone());
match op {
Some(Operation::Set { value, .. }) => {
resolved_elements.push(ResolvedPathElement::Value(value))
}
Some(Operation::Link { value, .. }) => {
match self.get_operations_for_object_id(&value) {
None => return None,
Some(ObjectHistory::Map { .. }) => {
resolved_elements.push(ResolvedPathElement::Map(value.clone()));
containing_object_id = value.clone()
}
Some(ObjectHistory::List { max_elem, .. }) => {
resolved_elements
.push(ResolvedPathElement::List(value.clone(), *max_elem));
containing_object_id = value.clone()
}
}
}
None => resolved_elements
.push(ResolvedPathElement::MissingKey(Key(key.to_string()))),
_ => return None,
}
}
PathElement::Index(index) => match index {
ListIndex::Head => {
match self.get_operations_for_object_id(&containing_object_id) {
Some(ObjectHistory::List { .. }) => {
resolved_elements.push(ResolvedPathElement::Index(ElementID::Head))
}
_ => return None,
};
}
ListIndex::Index(i) => {
let op = self
.get_operations_for_object_id(&containing_object_id)
.and_then(|history| match history {
ObjectHistory::List {
operations_by_elemid,
following,
..
} => list_ops_in_order(operations_by_elemid, following).ok(),
ObjectHistory::Map { .. } => None,
})
.and_then(|ops| ops.get(*i).map(|o| o.clone()))
.and_then(|(element_id, cops)| {
cops.active_op().map(|o| (element_id, o.operation.clone()))
});
match op {
Some((elem_id, Operation::Set { value, .. })) => {
resolved_elements.push(ResolvedPathElement::Index(elem_id));
resolved_elements.push(ResolvedPathElement::Value(value));
}
Some((_, Operation::Link { value, .. })) => {
match self.get_operations_for_object_id(&value) {
None => return None,
Some(ObjectHistory::Map { .. }) => {
resolved_elements
.push(ResolvedPathElement::Map(value.clone()));
containing_object_id = value
}
Some(ObjectHistory::List { max_elem, .. }) => {
resolved_elements.push(ResolvedPathElement::List(
value.clone(),
*max_elem,
));
containing_object_id = value
}
}
}
_ => return None,
}
}
},
}
}
Some(ResolvedPath::new(resolved_elements))
}
}
fn value_to_ops(actor_id: &ActorID, v: &Value) -> (ObjectID, Vec<Operation>) {
match v {
Value::List(vs) => {
let list_id = ObjectID::ID(uuid::Uuid::new_v4().to_string());
let mut ops = vec![Operation::MakeList {
object_id: list_id.clone(),
}];
let mut elem_ops: Vec<Operation> = vs
.into_iter()
.enumerate()
.map(|(index, elem_value)| {
let elem: u32 = (index + 1).try_into().unwrap();
let previous_elemid = match index {
0 => ElementID::Head,
_ => ElementID::SpecificElementID(actor_id.clone(), elem - 1),
};
let insert_op = Operation::Insert {
list_id: list_id.clone(),
elem,
key: previous_elemid,
};
let elem_id = ElementID::SpecificElementID(actor_id.clone(), elem);
let mut elem_value_ops: Vec<Operation> = match elem_value {
Value::Boolean { .. }
| Value::Str { .. }
| Value::Number { .. }
| Value::Null { .. } => {
vec![create_prim(list_id.clone(), elem_id.as_key(), elem_value)]
}
Value::Map { .. } | Value::List { .. } => {
let (linked_object_id, mut value_ops) =
value_to_ops(actor_id, elem_value);
value_ops.push(Operation::Link {
object_id: list_id.clone(),
key: elem_id.as_key(),
value: linked_object_id,
});
value_ops
}
};
let mut result = Vec::new();
result.push(insert_op);
result.append(&mut elem_value_ops);
result
})
.flatten()
.collect();
ops.append(&mut elem_ops);
(list_id, ops)
}
Value::Map(kvs) => {
let object_id = ObjectID::ID(uuid::Uuid::new_v4().to_string());
let mut ops = vec![Operation::MakeMap {
object_id: object_id.clone(),
}];
let mut key_ops: Vec<Operation> = kvs
.iter()
.map(|(k, v)| match v {
Value::Boolean { .. }
| Value::Str { .. }
| Value::Number { .. }
| Value::Null { .. } => vec![create_prim(object_id.clone(), Key(k.clone()), v)],
Value::Map { .. } | Value::List { .. } => {
let (linked_object_id, mut value_ops) = value_to_ops(actor_id, v);
value_ops.push(Operation::Link {
object_id: object_id.clone(),
key: Key(k.clone()),
value: linked_object_id,
});
value_ops
}
})
.flatten()
.collect();
ops.append(&mut key_ops);
(object_id, ops)
}
_ => panic!("Only a map or list can be the top level object in value_to_ops".to_string()),
}
}
fn create_prim(object_id: ObjectID, key: Key, value: &Value) -> Operation {
let prim_value = match value {
Value::Number(n) => PrimitiveValue::Number(*n),
Value::Boolean(b) => PrimitiveValue::Boolean(*b),
Value::Str(s) => PrimitiveValue::Str(s.to_string()),
Value::Null => PrimitiveValue::Null,
_ => panic!("Non primitive value passed to create_prim"),
};
Operation::Set {
object_id,
key,
value: prim_value,
datatype: None,
}
}

81
src/change_request.rs Normal file
View file

@ -0,0 +1,81 @@
use crate::value::Value;
/// Represents the various changes that you can make to a document, all of
/// these use a "path" to refer to parts of the document. You can generate
/// paths using a builder syntax. E.g this would refer to the second element
/// of an array under the "people" key in the root object
///
/// ```rust,no_run
/// # use automerge::{Path, ListIndex};
/// Path::root().key("people".to_string()).index(ListIndex::Index(1));
/// ```
///
/// Note that there is a special `ListIndex` for the head of a list, in case
/// you want to insert something at the beginning
#[derive(Debug)]
pub enum ChangeRequest {
Set { path: Path, value: Value },
Move { from: Path, to: Path },
Delete { path: Path },
Increment { path: Path, value: f64 },
InsertAfter { path: Path, value: Value },
}
#[derive(Clone, Debug, PartialEq)]
pub enum PathElement {
Root,
Key(String),
Index(ListIndex),
}
#[derive(Clone, Debug, PartialEq)]
pub enum ListIndex {
Head,
Index(usize),
}
/// Represents a location within a document
#[derive(Debug)]
pub struct Path(Vec<PathElement>);
impl Path {
/// A path at the root of the document
pub fn root() -> Path {
Path(vec![PathElement::Root])
}
/// Returns a new path which points to the list element at index of the
/// current path
pub fn index(&self, index: ListIndex) -> Path {
let mut elems = self.0.clone();
elems.push(PathElement::Index(index));
Path(elems)
}
/// Returns a new path which points to the element under this key in the
/// current path
pub fn key(&self, key: String) -> Path {
let mut elems = self.0.clone();
elems.push(PathElement::Key(key));
Path(elems)
}
/// Returns the parent of this part
pub fn parent(&self) -> Path {
Path(self.0.clone().into_iter().skip(1).collect())
}
pub fn is_root(&self) -> bool {
self.0.len() == 1 && self.0[0] == PathElement::Root
}
}
impl<'a> IntoIterator for &'a Path {
type Item = &'a PathElement;
type IntoIter = std::slice::Iter<'a, PathElement>;
fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}

View file

@ -0,0 +1,86 @@
use crate::actor_histories::ActorHistories;
use crate::error::AutomergeError;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{DataType, Operation, PrimitiveValue};
use std::cmp::PartialOrd;
/// Represents a set of operations which are relevant to either an element ID
/// or object ID and which occurred without knowledge of each other
#[derive(Debug, Clone)]
pub(crate) struct ConcurrentOperations {
operations: Vec<OperationWithMetadata>,
}
impl ConcurrentOperations {
pub(crate) fn new() -> ConcurrentOperations {
ConcurrentOperations {
operations: Vec::new(),
}
}
pub(crate) fn active_op(&self) -> Option<&OperationWithMetadata> {
// operations are sorted in incorporate_new_op, so the first op is the
// active one
self.operations.first()
}
pub(crate) fn incorporate_new_op(
&mut self,
new_op: OperationWithMetadata,
actor_histories: &ActorHistories,
) -> Result<(), AutomergeError> {
let mut concurrent: Vec<OperationWithMetadata> = match new_op.operation {
// If the operation is an increment op, then we are going to modify
// any Set operations to reflect the increment ops in the next
// part of this function
Operation::Increment { .. } => self.operations.clone(),
// Otherwise we filter out any operations that are not concurrent
// with the new one (i.e ones which causally precede the new one)
_ => self
.operations
.iter()
.filter(|op| actor_histories.are_concurrent(op, &new_op))
.cloned()
.collect(),
};
let this_op = new_op.clone();
match &new_op.operation {
// For Set or Link ops, we add them to the concurrent ops list, to
// be interpreted later as part of the document::walk
// implementation
Operation::Set { .. } | Operation::Link { .. } => {
concurrent.push(this_op);
}
// Increment ops are not stored in the op set, instead we update
// any Set operations which are a counter containing a number to
// reflect the increment operation
Operation::Increment {
value: inc_value, ..
} => concurrent.iter_mut().for_each(|op| {
if let Operation::Set {
value: PrimitiveValue::Number(ref mut n),
datatype: Some(DataType::Counter),
..
} = op.operation
{
*n += inc_value
}
}),
// All other operations are not relevant (e.g a concurrent
// operation set containing just a delete operation actually is an
// empty set, in document::walk we interpret this into a
// nonexistent part of the state)
_ => {}
}
// the partial_cmp implementation for `OperationWithMetadata` ensures
// that the operations are in the deterministic order required by
// automerge.
//
// Note we can unwrap because the partial_cmp definition never returns
// None
concurrent.sort_by(|a, b| a.partial_cmp(b).unwrap());
concurrent.reverse();
self.operations = concurrent;
Ok(())
}
}

View file

@ -1,10 +1,14 @@
use super::op_set::OpSet;
use super::AutomergeError;
use crate::protocol::Change;
use serde_json;
use super::{AutomergeError, ChangeRequest};
use crate::change_context::ChangeContext;
use crate::error::InvalidChangeRequest;
use crate::protocol::{ActorID, Change};
use crate::value::Value;
use uuid;
pub struct Document {
op_set: OpSet,
actor_id: ActorID,
}
impl Document {
@ -12,6 +16,7 @@ impl Document {
pub fn init() -> Document {
Document {
op_set: OpSet::init(),
actor_id: ActorID(uuid::Uuid::new_v4().to_string()),
}
}
@ -25,22 +30,42 @@ impl Document {
}
/// Get the current state of the document as a serde_json value
pub fn state(&self) -> Result<serde_json::Value, AutomergeError> {
self.op_set.root_value().map(|v| v.to_json())
pub fn state(&self) -> &Value {
self.op_set.root_value()
}
/// Add a single change to the document
pub fn apply_change(&mut self, change: Change) -> Result<(), AutomergeError> {
self.op_set.apply_change(change)
}
pub fn create_and_apply_change(
&mut self,
message: Option<String>,
requests: Vec<ChangeRequest>,
) -> Result<Change, InvalidChangeRequest> {
let mut change_ctx = ChangeContext::new(
&self.op_set.object_store,
self.actor_id.clone(),
&self.op_set.actor_histories,
self.op_set.clock.clone(),
);
let change = change_ctx.create_change(requests, message)?;
self.apply_change(change.clone())
.map_err(|e| InvalidChangeRequest(format!("Error applying change: {:?}", e)))?;
Ok(change)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::change_request::{ListIndex, Path};
use crate::protocol::{
ActorID, Clock, DataType, ElementID, Key, ObjectID, Operation, PrimitiveValue,
};
use crate::value::Value;
use serde_json;
use std::collections::HashMap;
#[test]
@ -167,7 +192,253 @@ mod tests {
"#,
)
.unwrap();
let actual_state = doc.state().unwrap();
let actual_state = doc.state().to_json();
assert_eq!(actual_state, expected)
}
#[test]
fn test_set_mutation() {
let mut doc = Document::init();
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("Some change".to_string()),
vec![ChangeRequest::Set {
path: Path::root().key("the-state".to_string()),
value: Value::from_json(&json_value),
}],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"the-state": {
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
doc.create_and_apply_change(
Some("another change".to_string()),
vec![ChangeRequest::Set {
path: Path::root()
.key("the-state".to_string())
.key("size_of_cards".to_string()),
value: Value::from_json(&serde_json::Value::Number(
serde_json::Number::from_f64(10.0).unwrap(),
)),
}],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"the-state": {
"cards_by_id": {},
"size_of_cards": 10.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
fn test_move_ops() {
let mut doc = Document::init();
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jack": {"value": 11}
},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("Init".to_string()),
vec![ChangeRequest::Set {
path: Path::root(),
value: Value::from_json(&json_value),
}],
)
.unwrap();
println!("Doc state: {:?}", doc.state().to_json());
doc.create_and_apply_change(
Some("Move jack".to_string()),
vec![
ChangeRequest::Move {
from: Path::root()
.key("cards_by_id".to_string())
.key("jack".to_string()),
to: Path::root()
.key("cards_by_id".to_string())
.key("jill".to_string()),
},
ChangeRequest::Move {
from: Path::root().key("size_of_cards".to_string()),
to: Path::root().key("number_of_cards".to_string()),
},
],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jill": {"value": 11.0}
},
"number_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
fn test_delete_op() {
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {
"jack": {"value": 11}
},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0, false]
}
"#,
)
.unwrap();
let mut doc = Document::init();
doc.create_and_apply_change(
Some("Init".to_string()),
vec![ChangeRequest::Set {
path: Path::root(),
value: Value::from_json(&json_value),
}],
)
.unwrap();
doc.create_and_apply_change(
Some("Delete everything".to_string()),
vec![
ChangeRequest::Delete {
path: Path::root()
.key("cards_by_id".to_string())
.key("jack".to_string()),
},
ChangeRequest::Delete {
path: Path::root()
.key("cards".to_string())
.index(ListIndex::Index(1)),
},
],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"cards_by_id": {},
"size_of_cards": 12.0,
"numRounds": 11.0,
"cards": [1.0]
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
#[test]
fn test_insert_ops() {
let json_value: serde_json::Value = serde_json::from_str(
r#"
{
"values": [1.0, false]
}
"#,
)
.unwrap();
let mut doc = Document::init();
doc.create_and_apply_change(
Some("Initial".to_string()),
vec![ChangeRequest::Set {
path: Path::root(),
value: Value::from_json(&json_value),
}],
)
.unwrap();
let person_json: serde_json::Value = serde_json::from_str(
r#"
{
"name": "fred",
"surname": "johnson"
}
"#,
)
.unwrap();
doc.create_and_apply_change(
Some("list additions".to_string()),
vec![
ChangeRequest::InsertAfter {
path: Path::root()
.key("values".to_string())
.index(ListIndex::Head),
value: Value::from_json(&person_json),
},
ChangeRequest::InsertAfter {
path: Path::root()
.key("values".to_string())
.index(ListIndex::Index(1)),
value: Value::from_json(&serde_json::Value::String("final".to_string())),
},
],
)
.unwrap();
let expected: serde_json::Value = serde_json::from_str(
r#"
{
"values": [
{
"name": "fred",
"surname": "johnson"
},
1.0,
false,
"final"
]
}
"#,
)
.unwrap();
assert_eq!(expected, doc.state().to_json());
}
}

View file

@ -29,3 +29,14 @@ impl fmt::Display for InvalidElementID {
}
impl Error for InvalidElementID {}
#[derive(Debug)]
pub struct InvalidChangeRequest(pub String);
impl Error for InvalidChangeRequest {}
impl fmt::Display for InvalidChangeRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self)
}
}

View file

@ -13,13 +13,62 @@
//! let changes_str = "<paste the contents of the output here>";
//! let changes: Vec<Change> = serde_json::from_str(changes_str).unwrap();
//! let doc = automerge::Document::load(changes).unwrap();
//! println!("{:?}", doc.state().unwrap());
//! println!("{:?}", doc.state().to_json());
//! ```
//!
//! Generate changes like so:
//!
//! ```rust,no_run
//! # use automerge::{Document, Change, ChangeRequest, Path, Value};
//! let mut doc = Document::init();
//! let json_value: serde_json::Value = serde_json::from_str(
//! r#"
//! {
//! "cards_by_id": {},
//! "size_of_cards": 12.0,
//! "numRounds": 11.0,
//! "cards": [1.0, false]
//! }
//! "#,
//! )
//! .unwrap();
//! doc.create_and_apply_change(
//! Some("Some change".to_string()),
//! vec![ChangeRequest::Set {
//! path: Path::root().key("the-state".to_string()),
//! value: Value::from_json(&json_value),
//! }],
//! )
//! .unwrap();
//! let expected: serde_json::Value = serde_json::from_str(
//! r#"
//! {
//! "the-state": {
//! "cards_by_id": {},
//! "size_of_cards": 12.0,
//! "numRounds": 11.0,
//! "cards": [1.0, false]
//! }
//! }
//! "#,
//! )
//! .unwrap();
//! assert_eq!(expected, doc.state().to_json());
//! ```
mod actor_histories;
mod change_context;
mod change_request;
mod concurrent_operations;
mod document;
mod error;
mod object_store;
mod op_set;
mod operation_with_metadata;
mod protocol;
mod value;
pub use change_request::{ChangeRequest, Path, ListIndex};
pub use document::Document;
pub use error::AutomergeError;
pub use protocol::Change;
pub use value::Value;

158
src/object_store.rs Normal file
View file

@ -0,0 +1,158 @@
use crate::actor_histories::ActorHistories;
use crate::concurrent_operations::ConcurrentOperations;
use crate::error::AutomergeError;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{ElementID, ObjectID, Operation};
use std::collections::HashMap;
use std::str::FromStr;
/// ObjectHistory is what the OpSet uses to store operations for a particular
/// key, they represent the two possible container types in automerge, a map or
/// a sequence (tables and text are effectively the maps and sequences
/// respectively).
#[derive(Debug, Clone)]
pub(crate) enum ObjectHistory {
Map {
operations_by_key: HashMap<String, ConcurrentOperations>,
},
List {
operations_by_elemid: HashMap<ElementID, ConcurrentOperations>,
insertions: HashMap<ElementID, ElementID>,
following: HashMap<ElementID, Vec<ElementID>>,
max_elem: u32,
},
}
/// The ObjectStore is responsible for storing the concurrent operations seen
/// for each object ID and for the logic of incorporating a new operation.
#[derive(Debug, Clone)]
pub struct ObjectStore {
operations_by_object_id: HashMap<ObjectID, ObjectHistory>,
}
impl ObjectStore {
pub(crate) fn new() -> ObjectStore {
let root = ObjectHistory::Map {
operations_by_key: HashMap::new(),
};
let mut ops_by_id = HashMap::new();
ops_by_id.insert(ObjectID::Root, root);
ObjectStore {
operations_by_object_id: ops_by_id,
}
}
pub(crate) fn history_for_object_id(&self, object_id: &ObjectID) -> Option<&ObjectHistory> {
self.operations_by_object_id.get(object_id)
}
/// Incorporates a new operation into the object store. The caller is
/// responsible for ensuring that all causal dependencies of the new
/// operation have already been applied.
pub(crate) fn apply_operation(
&mut self,
actor_histories: &ActorHistories,
op_with_metadata: OperationWithMetadata,
) -> Result<(), AutomergeError> {
match op_with_metadata.operation {
Operation::MakeMap { object_id } | Operation::MakeTable { object_id } => {
let object = ObjectHistory::Map {
operations_by_key: HashMap::new(),
};
self.operations_by_object_id.insert(object_id, object);
}
Operation::MakeList { object_id } | Operation::MakeText { object_id } => {
let object = ObjectHistory::List {
operations_by_elemid: HashMap::new(),
insertions: HashMap::new(),
following: HashMap::new(),
max_elem: 0,
};
self.operations_by_object_id.insert(object_id, object);
}
Operation::Link {
ref object_id,
ref key,
..
}
| Operation::Delete {
ref object_id,
ref key,
}
| Operation::Set {
ref object_id,
ref key,
..
}
| Operation::Increment {
ref object_id,
ref key,
..
} => {
let object = self
.operations_by_object_id
.get_mut(&object_id)
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?;
let prior_ops = match object {
ObjectHistory::Map {
ref mut operations_by_key,
} => operations_by_key
.entry(key.0.clone())
.or_insert_with(ConcurrentOperations::new),
ObjectHistory::List {
ref mut operations_by_elemid,
..
} => {
let elem_id = ElementID::from_str(&key.0).map_err(|_| AutomergeError::InvalidChange(format!("Attempted to link, set, delete, or increment an object in a list with invalid element ID {:?}", key.0)))?;
operations_by_elemid
.entry(elem_id.clone())
.or_insert_with(ConcurrentOperations::new)
}
};
prior_ops.incorporate_new_op(op_with_metadata, actor_histories)?;
}
Operation::Insert {
ref list_id,
ref key,
ref elem,
} => {
let list = self
.operations_by_object_id
.get_mut(&list_id)
.ok_or_else(|| AutomergeError::MissingObjectError(list_id.clone()))?;
match list {
ObjectHistory::Map { .. } => {
return Err(AutomergeError::InvalidChange(format!(
"Insert operation received for object key (object ID: {:?}, key: {:?}",
list_id, key
)))
}
ObjectHistory::List {
insertions,
following,
operations_by_elemid,
max_elem,
} => {
let inserted_elemid =
ElementID::SpecificElementID(op_with_metadata.actor_id.clone(), *elem);
if insertions.contains_key(&inserted_elemid) {
return Err(AutomergeError::InvalidChange(format!(
"Received an insertion for already present key: {:?}",
inserted_elemid
)));
}
insertions.insert(inserted_elemid.clone(), inserted_elemid.clone());
let following_ops = following.entry(key.clone()).or_insert_with(Vec::new);
following_ops.push(inserted_elemid.clone());
operations_by_elemid
.entry(inserted_elemid)
.or_insert_with(ConcurrentOperations::new);
*max_elem = std::cmp::max(*max_elem, *elem);
}
}
}
}
Ok(())
}
}

View file

@ -6,264 +6,55 @@
//! document::state) the implementation fetches the root object ID's history
//! and then recursively walks through the tree of histories constructing the
//! state. Obviously this is not very efficient.
use crate::actor_histories::ActorHistories;
use crate::concurrent_operations::ConcurrentOperations;
use crate::error::AutomergeError;
use crate::protocol::{
ActorID, Change, Clock, DataType, ElementID, Key, ObjectID, Operation, PrimitiveValue,
};
use serde::Serialize;
use std::cmp::{Ordering, PartialOrd};
use crate::object_store::{ObjectHistory, ObjectStore};
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{Change, Clock, ElementID, Key, ObjectID, Operation, PrimitiveValue};
use crate::value::Value;
use std::collections::HashMap;
use std::str::FromStr;
/// We deserialize individual operations as part of the `Change` structure, but
/// we need access to the actor ID and sequence when applying each individual
/// operation, so we copy the operation, actor ID, and sequence into this
/// struct.
#[derive(PartialEq, Debug, Clone)]
pub struct OperationWithMetadata {
sequence: u32,
actor_id: ActorID,
operation: Operation,
}
/// Note, we can't implement Ord because the Operation contains floating point
/// elements
impl PartialOrd for OperationWithMetadata {
fn partial_cmp(&self, other: &OperationWithMetadata) -> Option<Ordering> {
if self.sequence == other.sequence {
Some(self.actor_id.cmp(&other.actor_id))
} else {
Some(self.sequence.cmp(&other.sequence))
}
}
}
/// Represents a set of operations which are relevant to either an element ID
/// or object ID and which occurred without knowledge of each other
#[derive(Debug)]
struct ConcurrentOperations {
operations: Vec<OperationWithMetadata>,
}
impl ConcurrentOperations {
fn new() -> ConcurrentOperations {
ConcurrentOperations {
operations: Vec::new(),
}
}
fn active_op(&self) -> Option<&OperationWithMetadata> {
// operations are sorted in incorporate_new_op, so the first op is the
// active one
self.operations.first()
}
fn incorporate_new_op(
&mut self,
new_op: OperationWithMetadata,
actor_histories: &ActorHistories,
) -> Result<(), AutomergeError> {
let mut concurrent: Vec<OperationWithMetadata> = match new_op.operation {
// If the operation is an increment op, then we are going to modify
// any Set operations to reflect the increment ops in the next
// part of this function
Operation::Increment { .. } => self.operations.clone(),
// Otherwise we filter out any operations that are not concurrent
// with the new one (i.e ones which causally precede the new one)
_ => self
.operations
.iter()
.filter(|op| actor_histories.are_concurrent(op, &new_op))
.cloned()
.collect(),
};
let this_op = new_op.clone();
match &new_op.operation {
// For Set or Link ops, we add them to the concurrent ops list, to
// be interpreted later as part of the document::walk
// implementation
Operation::Set { .. } | Operation::Link { .. } => {
concurrent.push(this_op);
}
// Increment ops are not stored in the op set, instead we update
// any Set operations which are a counter containing a number to
// reflect the increment operation
Operation::Increment {
value: inc_value, ..
} => concurrent.iter_mut().for_each(|op| {
if let Operation::Set {
value: PrimitiveValue::Number(ref mut n),
datatype: Some(DataType::Counter),
..
} = op.operation
{
*n += inc_value
}
}),
// All other operations are not relevant (e.g a concurrent
// operation set containing just a delete operation actually is an
// empty set, in document::walk we interpret this into a
// nonexistent part of the state)
_ => {}
}
// the partial_cmp implementation for `OperationWithMetadata` ensures
// that the operations are in the deterministic order required by
// automerge.
concurrent.sort_by(|a, b| a.partial_cmp(b).unwrap());
concurrent.reverse();
self.operations = concurrent;
Ok(())
}
}
/// ObjectHistory is what the OpSet uses to store operations for a particular
/// key, they represent the two possible container types in automerge, a map or
/// a sequence (tables and text are effectively the maps and sequences
/// respectively).
#[derive(Debug)]
enum ObjectHistory {
Map {
operations_by_key: HashMap<String, ConcurrentOperations>,
},
List {
operations_by_elemid: HashMap<ElementID, ConcurrentOperations>,
insertions: HashMap<ElementID, ElementID>,
following: HashMap<ElementID, Vec<ElementID>>,
},
}
/// ActorHistories is a cache for the transitive dependencies of each change
/// received from each actor. This is necessary because a change only ships its
/// direct dependencies in `deps` but we need all dependencies to determine
/// whether two operations occurrred concurrently.
#[derive(Debug)]
pub struct ActorHistories(HashMap<ActorID, HashMap<u32, Clock>>);
impl ActorHistories {
/// Return the latest sequence required by `op` for actor `actor`
fn dependency_for(&self, op: &OperationWithMetadata, actor: &ActorID) -> u32 {
self.0
.get(&op.actor_id)
.and_then(|clocks| clocks.get(&op.sequence))
.map(|c| c.seq_for(actor))
.unwrap_or(0)
}
/// Whether or not `change` is already part of this `ActorHistories`
fn is_applied(&self, change: &Change) -> bool {
self.0
.get(&change.actor_id)
.and_then(|clocks| clocks.get(&change.seq))
.map(|c| c.seq_for(&change.actor_id) >= change.seq)
.unwrap_or(false)
}
/// Update this ActorHistories to include the changes in `change`
fn add_change(&mut self, change: &Change) {
let change_deps = change
.dependencies
.with_dependency(&change.actor_id, change.seq - 1);
let transitive = self.transitive_dependencies(&change.actor_id, change.seq);
let all_deps = transitive.upper_bound(&change_deps);
let state = self
.0
.entry(change.actor_id.clone())
.or_insert_with(HashMap::new);
state.insert(change.seq, all_deps);
}
fn transitive_dependencies(&mut self, actor_id: &ActorID, seq: u32) -> Clock {
self.0
.get(actor_id)
.and_then(|deps| deps.get(&seq))
.cloned()
.unwrap_or_else(Clock::empty)
}
/// Whether the two operations in question are concurrent
fn are_concurrent(&self, op1: &OperationWithMetadata, op2: &OperationWithMetadata) -> bool {
if op1.sequence == op2.sequence && op1.actor_id == op2.actor_id {
return false;
}
self.dependency_for(op1, &op2.actor_id) < op2.sequence
&& self.dependency_for(op2, &op1.actor_id) < op1.sequence
}
}
/// Possible values of an element of the state. Using this rather than
/// serde_json::Value because we'll probably want to make the core logic
/// independent of serde in order to be `no_std` compatible.
#[derive(Serialize)]
#[serde(untagged)]
pub enum Value {
Map(HashMap<String, Value>),
List(Vec<Value>),
Str(String),
Number(f64),
Boolean(bool),
Null,
}
impl Value {
pub fn to_json(&self) -> serde_json::Value {
match self {
Value::Map(map) => {
let result: serde_json::map::Map<String, serde_json::Value> =
map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect();
serde_json::Value::Object(result)
}
Value::List(elements) => {
serde_json::Value::Array(elements.iter().map(|v| v.to_json()).collect())
}
Value::Str(s) => serde_json::Value::String(s.to_string()),
Value::Number(n) => serde_json::Value::Number(
serde_json::Number::from_f64(*n).unwrap_or_else(|| serde_json::Number::from(0)),
),
Value::Boolean(b) => serde_json::Value::Bool(*b),
Value::Null => serde_json::Value::Null,
}
}
}
/// The core logic of the whole libary. Combines operations and allows querying
/// the current state.
/// The OpSet manages an ObjectStore, and a queue of incoming changes in order
/// to ensure that operations are delivered to the object store in causal order
///
/// Whenever a new change is received we iterate through any causally ready
/// changes in the queue and apply them, then repeat until there are no
/// causally ready changes left. The end result of this is that
/// `operations_by_object_id` will contain sets of concurrent operations
/// for each object ID or element ID.
/// changes in the queue and apply them to the object store, then repeat until
/// there are no causally ready changes left. The end result of this is that
/// the object store will contain sets of concurrent operations for each object
/// ID or element ID.
///
/// When we want to get the state of the CRDT we walk through the
/// `operations_by_object_id` map, starting with the root object ID and
/// constructing the value at each node by examining the concurrent operations
/// which are active for that node.
/// object store, starting with the root object ID and constructing the value
/// at each node by examining the concurrent operationsi which are active for
/// that node.
#[derive(Debug)]
pub struct OpSet {
operations_by_object_id: HashMap<ObjectID, ObjectHistory>,
actor_histories: ActorHistories,
pub object_store: ObjectStore,
pub actor_histories: ActorHistories,
queue: Vec<Change>,
clock: Clock,
pub clock: Clock,
state: Value,
}
impl OpSet {
pub fn init() -> OpSet {
let root = ObjectHistory::Map {
operations_by_key: HashMap::new(),
};
let mut ops_by_id = HashMap::new();
ops_by_id.insert(ObjectID::Root, root);
OpSet {
operations_by_object_id: ops_by_id,
actor_histories: ActorHistories(HashMap::new()),
object_store: ObjectStore::new(),
actor_histories: ActorHistories::new(),
queue: Vec::new(),
clock: Clock::empty(),
state: Value::Map(HashMap::new()),
}
}
/// Adds a change to the internal queue of operations, then iteratively
/// applies all causally ready changes until there are none remaining
pub fn apply_change(&mut self, change: Change) -> Result<(), AutomergeError> {
self.queue.push(change);
self.apply_causally_ready_changes()
self.apply_causally_ready_changes()?;
self.state = self.walk(&ObjectID::Root)?;
Ok(())
}
fn apply_causally_ready_changes(&mut self) -> Result<(), AutomergeError> {
@ -296,78 +87,13 @@ impl OpSet {
let actor_id = change.actor_id.clone();
let seq = change.seq;
for operation in change.operations {
let operation_copy = operation.clone();
let op_with_metadata = OperationWithMetadata {
sequence: seq,
actor_id: actor_id.clone(),
operation: operation_copy,
operation: operation.clone(),
};
match operation {
Operation::MakeMap { object_id } | Operation::MakeTable { object_id } => {
let object = ObjectHistory::Map {
operations_by_key: HashMap::new(),
};
self.operations_by_object_id.insert(object_id, object);
}
Operation::MakeList { object_id } | Operation::MakeText { object_id } => {
let object = ObjectHistory::List {
operations_by_elemid: HashMap::new(),
insertions: HashMap::new(),
following: HashMap::new(),
};
self.operations_by_object_id.insert(object_id, object);
}
Operation::Link { object_id, key, .. }
| Operation::Delete { object_id, key }
| Operation::Set { object_id, key, .. }
| Operation::Increment { object_id, key, .. } => {
let object = self
.operations_by_object_id
.get_mut(&object_id)
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?;
let prior_ops = match object {
ObjectHistory::Map {
ref mut operations_by_key,
} => operations_by_key
.entry(key.0.clone())
.or_insert_with(ConcurrentOperations::new),
ObjectHistory::List {
ref mut operations_by_elemid,
..
} => {
let elem_id = ElementID::from_str(&key.0).map_err(|_| AutomergeError::InvalidChange(format!("Attempted to link to an object in a list with invalid element ID {:?}", key.0)))?;
operations_by_elemid
.entry(elem_id.clone())
.or_insert_with(ConcurrentOperations::new)
}
};
prior_ops.incorporate_new_op(op_with_metadata, &self.actor_histories)?;
}
Operation::Insert {
ref list_id,
ref key,
ref elem,
} => {
let list = self
.operations_by_object_id
.get_mut(&list_id)
.ok_or_else(|| AutomergeError::MissingObjectError(list_id.clone()))?;
match list {
ObjectHistory::Map{..} => return Err(AutomergeError::InvalidChange(format!("Insert operation received for object key (object ID: {:?}, key: {:?}", list_id, key))),
ObjectHistory::List{insertions, following, operations_by_elemid} => {
if insertions.contains_key(&key) {
return Err(AutomergeError::InvalidChange(format!("Received an insertion for already present key: {:?}", key)));
}
let inserted_elemid = ElementID::SpecificElementID(actor_id.clone(), *elem);
insertions.insert(key.clone(), inserted_elemid.clone());
let following_ops = following.entry(key.clone()).or_insert_with(Vec::new);
following_ops.push(inserted_elemid.clone());
operations_by_elemid.entry(inserted_elemid).or_insert_with(ConcurrentOperations::new);
}
}
}
}
self.object_store
.apply_operation(&self.actor_histories, op_with_metadata)?;
}
self.clock = self
.clock
@ -375,16 +101,16 @@ impl OpSet {
Ok(())
}
pub fn root_value(&self) -> Result<Value, AutomergeError> {
self.walk(&ObjectID::Root)
pub fn root_value(&self) -> &Value {
return &self.state;
}
/// This is where we actually interpret the concurrent operations for each
/// part of the object and construct the value.
/// part of the object and construct the current state.
fn walk(&self, object_id: &ObjectID) -> Result<Value, AutomergeError> {
let object_history = self
.operations_by_object_id
.get(object_id)
.object_store
.history_for_object_id(object_id)
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?;
match object_history {
ObjectHistory::Map { operations_by_key } => self.interpret_map_ops(operations_by_key),
@ -392,6 +118,7 @@ impl OpSet {
operations_by_elemid,
insertions,
following,
..
} => self.interpret_list_ops(operations_by_elemid, insertions, following),
}
}
@ -447,39 +174,7 @@ impl OpSet {
_insertions: &HashMap<ElementID, ElementID>,
following: &HashMap<ElementID, Vec<ElementID>>,
) -> Result<Value, AutomergeError> {
// First we construct a vector of operations to process in order based
// on the insertion orders of the operations we've received
let mut ops_in_order: Vec<&ConcurrentOperations> = Vec::new();
// start with everything that was inserted after _head
let mut to_process: Vec<ElementID> = following
.get(&ElementID::Head)
.map(|heads| {
let mut sorted = heads.to_vec();
sorted.sort();
sorted
})
.unwrap_or_else(Vec::new);
// for each element ID, add the operation to the ops_in_order list,
// then find all the following element IDs, sort them and add them to
// the list of element IDs still to process.
while let Some(next_element_id) = to_process.pop() {
let ops = operations_by_elemid.get(&next_element_id).ok_or_else(|| {
AutomergeError::InvalidChange(format!(
"Missing element ID {:?} when interpreting list ops",
next_element_id
))
})?;
ops_in_order.push(ops);
if let Some(followers) = following.get(&next_element_id) {
let mut sorted = followers.to_vec();
sorted.sort();
sorted.reverse();
for follower in sorted {
to_process.push(follower.clone())
}
}
}
let ops_in_order = list_ops_in_order(operations_by_elemid, following)?;
// Now that we have a list of `ConcurrentOperations` in the correct
// order, we need to interpret each one to construct the value that
@ -487,7 +182,7 @@ impl OpSet {
let result_with_errs =
ops_in_order
.iter()
.filter_map(|ops| -> Option<Result<Value, AutomergeError>> {
.filter_map(|(_, ops)| -> Option<Result<Value, AutomergeError>> {
ops.active_op().map(|op| match &op.operation {
Operation::Set { value, .. } => Ok(match value {
PrimitiveValue::Null => Value::Null,
@ -508,3 +203,43 @@ impl OpSet {
Ok(Value::List(result))
}
}
pub(crate) fn list_ops_in_order<'a>(
operations_by_elemid: &'a HashMap<ElementID, ConcurrentOperations>,
following: &HashMap<ElementID, Vec<ElementID>>,
) -> Result<Vec<(ElementID, &'a ConcurrentOperations)>, AutomergeError> {
// First we construct a vector of operations to process in order based
// on the insertion orders of the operations we've received
let mut ops_in_order: Vec<(ElementID, &ConcurrentOperations)> = Vec::new();
// start with everything that was inserted after _head
let mut to_process: Vec<ElementID> = following
.get(&ElementID::Head)
.map(|heads| {
let mut sorted = heads.to_vec();
sorted.sort();
sorted
})
.unwrap_or_else(Vec::new);
// for each element ID, add the operation to the ops_in_order list,
// then find all the following element IDs, sort them and add them to
// the list of element IDs still to process.
while let Some(next_element_id) = to_process.pop() {
let ops = operations_by_elemid.get(&next_element_id).ok_or_else(|| {
AutomergeError::InvalidChange(format!(
"Missing element ID {:?} when interpreting list ops",
next_element_id
))
})?;
ops_in_order.push((next_element_id.clone(), ops));
if let Some(followers) = following.get(&next_element_id) {
let mut sorted = followers.to_vec();
sorted.sort();
sorted.reverse();
for follower in sorted {
to_process.push(follower.clone())
}
}
}
Ok(ops_in_order)
}

View file

@ -0,0 +1,25 @@
use crate::protocol::{ActorID, Operation};
use std::cmp::{Ordering, PartialOrd};
/// We deserialize individual operations as part of the `Change` structure, but
/// we need access to the actor ID and sequence when applying each individual
/// operation, so we copy the operation, actor ID, and sequence into this
/// struct.
#[derive(PartialEq, Debug, Clone)]
pub struct OperationWithMetadata {
pub sequence: u32,
pub actor_id: ActorID,
pub operation: Operation,
}
/// Note, we can't implement Ord because the Operation contains floating point
/// elements
impl PartialOrd for OperationWithMetadata {
fn partial_cmp(&self, other: &OperationWithMetadata) -> Option<Ordering> {
if self.sequence == other.sequence {
Some(self.actor_id.cmp(&other.actor_id))
} else {
Some(self.sequence.cmp(&other.sequence))
}
}
}

View file

@ -123,6 +123,15 @@ pub enum ElementID {
SpecificElementID(ActorID, u32),
}
impl ElementID {
pub fn as_key(&self) -> Key {
match self {
ElementID::Head => Key("_head".to_string()),
ElementID::SpecificElementID(actor_id, elem) => Key(format!("{}:{}", actor_id.0, elem)),
}
}
}
impl<'de> Deserialize<'de> for ElementID {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
@ -266,7 +275,7 @@ pub enum Operation {
},
}
#[derive(Deserialize, Serialize, PartialEq, Debug)]
#[derive(Deserialize, Serialize, PartialEq, Debug, Clone)]
pub struct Change {
#[serde(rename = "actor")]
pub(crate) actor_id: ActorID,

54
src/value.rs Normal file
View file

@ -0,0 +1,54 @@
use serde::Serialize;
use std::collections::HashMap;
/// Possible values of an element of the state. Using this rather than
/// serde_json::Value because we'll probably want to make the core logic
/// independent of serde in order to be `no_std` compatible.
#[derive(Serialize, Clone, Debug)]
#[serde(untagged)]
pub enum Value {
Map(HashMap<String, Value>),
List(Vec<Value>),
Str(String),
Number(f64),
Boolean(bool),
Null,
}
impl Value {
pub fn from_json(json: &serde_json::Value) -> Value {
match json {
serde_json::Value::Object(kvs) => {
let result: HashMap<String, Value> = kvs
.iter()
.map(|(k, v)| (k.clone(), Value::from_json(v)))
.collect();
Value::Map(result)
}
serde_json::Value::Array(vs) => Value::List(vs.iter().map(Value::from_json).collect()),
serde_json::Value::String(s) => Value::Str(s.to_string()),
serde_json::Value::Number(n) => Value::Number(n.as_f64().unwrap_or(0.0)),
serde_json::Value::Bool(b) => Value::Boolean(*b),
serde_json::Value::Null => Value::Null,
}
}
pub fn to_json(&self) -> serde_json::Value {
match self {
Value::Map(map) => {
let result: serde_json::map::Map<String, serde_json::Value> =
map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect();
serde_json::Value::Object(result)
}
Value::List(elements) => {
serde_json::Value::Array(elements.iter().map(|v| v.to_json()).collect())
}
Value::Str(s) => serde_json::Value::String(s.to_string()),
Value::Number(n) => serde_json::Value::Number(
serde_json::Number::from_f64(*n).unwrap_or_else(|| serde_json::Number::from(0)),
),
Value::Boolean(b) => serde_json::Value::Bool(*b),
Value::Null => serde_json::Value::Null,
}
}
}

View file

@ -289,6 +289,6 @@ fn test_concurrent_ops() {
"#,
)
.unwrap();
let actual = doc.state().unwrap();
let actual = doc.state().to_json();
assert_eq!(expected, actual);
}