This commit is contained in:
Orion Henry 2020-03-23 11:19:59 -07:00
parent 2aaeb4788c
commit 2e80b5b3a2
11 changed files with 349 additions and 1298 deletions

View file

@ -6,7 +6,7 @@
"license": "MIT",
"main": "./index.js",
"scripts": {
"build": "rimraf pkg && wasm-pack build --target nodejs --out-name index",
"build": "rimraf pkg && wasm-pack build --target nodejs --out-name index --dev",
"release": "rimraf pkg && wasm-pack build --target nodejs --out-name index --release",
"mocha": "yarn build && mocha --bail --full-trace",
"test": "cargo test && wasm-pack test --node"

View file

@ -10,6 +10,8 @@ edition = "2018"
serde = { version = "^1.0", features=["derive"] }
serde_json = "^1.0"
uuid = { version = "^0.5.1", features=["v4"] }
wasm-bindgen = "^0.2"
js-sys = "^0.3"
[dependencies.web-sys]
version = "0.3"

View file

@ -85,7 +85,7 @@ impl ActorStates {
}
}
let deps = change.dependencies.with(&change.actor_id, change.seq - 1);
let deps = change.deps.with(&change.actor_id, change.seq - 1);
let all_deps = self.transitive_deps(&deps);
let actor_id = change.actor_id.clone();

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,9 @@ pub enum AutomergeError {
MissingObjectError(OpID),
InvalidObjectType(String),
InvalidOpID(String),
InvalidChangeRequest,
InvalidLinkTarget,
UnknownVersion(u64),
DuplicateChange(String),
NotImplemented(String),
InvalidChange(String),

View file

@ -16,6 +16,7 @@ mod op_set;
mod operation_with_metadata;
mod patch;
mod protocol;
mod time;
mod value;
pub use crate::protocol::{
@ -27,7 +28,7 @@ pub use backend::Backend;
pub use concurrent_operations::ConcurrentOperations;
pub use error::AutomergeError;
pub use object_store::ObjState;
pub use op_set::OpSet;
pub use op_set::{OpSet, Version};
pub use operation_with_metadata::OperationWithMetadata;
pub use patch::{
Conflict, Diff, Diff2, DiffAction, ElementValue, MapType, Patch, PendingDiff, SequenceType,

View file

@ -11,7 +11,7 @@ use crate::error::AutomergeError;
use crate::object_store::ObjState;
use crate::operation_with_metadata::OperationWithMetadata;
use crate::protocol::{Change, Clock, OpID, Operation};
use crate::{ActorID, Diff2, Key, ObjType, PendingDiff};
use crate::{ActorID, ChangeRequest, Diff2, Key, ObjType, PendingDiff};
use core::cmp::max;
use std::collections::HashMap;
use std::collections::HashSet;
@ -29,12 +29,21 @@ use std::collections::HashSet;
/// object store, starting with the root object ID and constructing the value
/// at each node by examining the concurrent operationsi which are active for
/// that node.
///
#[derive(Debug, PartialEq, Clone)]
pub struct Version {
pub version: u64,
pub local_only: bool,
pub op_set: OpSet,
}
#[derive(Debug, PartialEq, Clone)]
pub struct OpSet {
pub objs: HashMap<OpID, ObjState>,
queue: Vec<Change>,
pub clock: Clock,
undo_pos: usize,
pub deps: Clock,
pub undo_pos: usize,
pub undo_stack: Vec<Vec<Operation>>,
pub redo_stack: Vec<Vec<Operation>>,
pub states: ActorStates,
@ -50,6 +59,7 @@ impl OpSet {
objs,
queue: Vec::new(),
clock: Clock::empty(),
deps: Clock::empty(),
undo_pos: 0,
undo_stack: Vec::new(),
redo_stack: Vec::new(),
@ -58,112 +68,31 @@ impl OpSet {
}
}
pub fn do_redo(
&mut self,
actor_id: ActorID,
seq: u32,
message: Option<String>,
dependencies: Clock,
diffs: &mut Vec<PendingDiff>,
diff2: &mut Diff2,
) -> Result<(), AutomergeError> {
if let Some(redo_ops) = self.redo_stack.pop() {
let change = Change {
actor_id,
start_op: 0, // FIXME
time: 0, // FIXME
seq,
message,
dependencies,
operations: redo_ops,
};
self.undo_pos += 1;
self.add_change(change, false, diffs, diff2)
} else {
Err(AutomergeError::InvalidChange("no redo ops".to_string()))
}
}
pub fn do_undo(
&mut self,
actor_id: ActorID,
seq: u32,
message: Option<String>,
dependencies: Clock,
diffs: &mut Vec<PendingDiff>,
diff2: &mut Diff2,
) -> Result<(), AutomergeError> {
if let Some(undo_ops) = self.undo_stack.get(self.undo_pos - 1) {
let redo_ops = undo_ops
.iter()
.filter_map(|op| match &op {
Operation::Increment {
object_id: oid,
key,
value,
pred,
} => Some(vec![Operation::Increment {
object_id: oid.clone(),
key: key.clone(),
value: -value,
pred: pred.clone(),
}]),
Operation::Set { .. } | Operation::Link { .. } | Operation::Delete { .. } => {
panic!("not implemented")
}
/*
self
.concurrent_operations_for_field(object_id, key)
.map(|cops| {
if cops.active_op().is_some() {
cops.pure_operations()
} else {
vec![Operation::Delete {
object_id: object_id.clone(),
key: key.clone(),
pred: pred.clone(),
}]
}
}),
*/
_ => None,
})
.flatten()
.collect();
self.redo_stack.push(redo_ops);
let change = Change {
start_op: 0, // FIXME
time: 0, // FIXME
actor_id,
seq,
message,
dependencies,
operations: undo_ops.clone(),
};
self.undo_pos -= 1;
self.add_change(change, false, diffs, diff2)
} else {
Err(AutomergeError::InvalidChange(
"No undo ops to execute".to_string(),
))
}
}
/// Adds a change to the internal queue of operations, then iteratively
/// applies all causally ready changes until there are none remaining
///
/// If `make_undoable` is true, the op set will store a set of operations
/// which can be used to undo this change.
///
pub fn add_change(
&mut self,
change: Change,
make_undoable: bool,
local: bool,
undoable: bool,
diffs: &mut Vec<PendingDiff>,
diff2: &mut Diff2,
) -> Result<(), AutomergeError> {
self.queue.push(change);
if local {
self.apply_change(change, local, undoable, diffs)
} else {
self.queue.push(change);
self.apply_queued_ops(diffs)
}
}
fn apply_queued_ops(&mut self, diffs: &mut Vec<PendingDiff>) -> Result<(), AutomergeError> {
while let Some(next_change) = self.pop_next_causally_ready_change() {
self.apply_change(next_change, make_undoable, diffs, diff2)?;
self.apply_change(next_change, false, false, diffs)?;
}
Ok(())
}
@ -172,7 +101,7 @@ impl OpSet {
let mut index = 0;
while index < self.queue.len() {
let change = self.queue.get(index).unwrap();
let deps = change.dependencies.with(&change.actor_id, change.seq - 1);
let deps = change.deps.with(&change.actor_id, change.seq - 1);
if deps <= self.clock {
return Some(self.queue.remove(index));
}
@ -184,9 +113,9 @@ impl OpSet {
fn apply_change(
&mut self,
change: Change,
make_undoable: bool,
local: bool,
undoable: bool,
diffs: &mut Vec<PendingDiff>,
diff2: &mut Diff2,
) -> Result<(), AutomergeError> {
// This method is a little more complicated than it intuitively should
// be due to the bookkeeping required for undo. If we're asked to make
@ -208,7 +137,7 @@ impl OpSet {
return Ok(());
}
let all_undo_ops = Vec::new();
let mut all_undo_ops = Vec::new();
let mut new_object_ids: HashSet<OpID> = HashSet::new();
for (n, operation) in operations.iter().enumerate() {
@ -225,22 +154,20 @@ impl OpSet {
new_object_ids.insert(metaop.opid.clone());
}
let diff = self.apply_op(metaop, diff2)?;
let (diff, undo_ops) = self.apply_op(metaop.clone())?;
// FIXME - this should be Option<Vec<..>> but I couldnt get it to work
diffs.push(diff);
// If this object is not created in this change then we need to
// store the undo ops for it (if we're storing undo ops at all)
if make_undoable && !(new_object_ids.contains(operation.obj())) {
//all_undo_ops.extend(undo_ops);
if undoable && !(new_object_ids.contains(metaop.object_id())) {
all_undo_ops.extend(undo_ops);
}
}
self.max_op = max(self.max_op, start_op + num_ops - 1);
self.clock = self.clock.with(&actor_id, seq);
if make_undoable {
if undoable {
let (new_undo_stack_slice, _) = self.undo_stack.split_at(self.undo_pos);
let mut new_undo_stack: Vec<Vec<Operation>> = new_undo_stack_slice.to_vec();
new_undo_stack.push(all_undo_ops);
@ -261,12 +188,13 @@ impl OpSet {
pub fn apply_op(
&mut self,
op: OperationWithMetadata,
_diff2: &mut Diff2,
) -> Result<PendingDiff, AutomergeError> {
) -> Result<(PendingDiff, Vec<Operation>), AutomergeError> {
if let Some(obj_type) = op.make_type() {
self.objs.insert(op.opid.clone(), ObjState::new(obj_type));
}
let undo_ops = Vec::new();
let object_id = op.object_id();
let object = self.get_obj(&object_id)?;
@ -282,13 +210,13 @@ impl OpSet {
self.unlink(&op, &overwritten_ops)?;
Ok(PendingDiff::Seq(op.clone(), index))
Ok((PendingDiff::Seq(op.clone(), index), undo_ops))
} else {
let ops = object.props.entry(op.key().clone()).or_default();
let overwritten_ops = ops.incorporate_new_op(&op)?;
self.unlink(&op, &overwritten_ops)?;
Ok(PendingDiff::Map(op.clone()))
Ok((PendingDiff::Map(op.clone()), undo_ops))
}
}
@ -322,14 +250,6 @@ impl OpSet {
inbound.opid.clone(),
),
);
/*
let ops = self
.objs
.get(inbound.object_id())
.and_then(|parent| parent.props.get(inbound.key()))
.unwrap(); // FIXME
*/
//let tmp = (inbound.key(), ops, oid.clone());
}
Ok(path)
}
@ -377,18 +297,11 @@ impl OpSet {
Ok(diff2)
}
/*
fn get_list(&mut self, list_id: &OpID) -> Result<&mut ListState, AutomergeError> {
let list = self.get_obj(list_id)?;
match list {
ObjectState::Map { .. } => Err(AutomergeError::InvalidChange(format!(
"Insert operation received for object (object ID: {:?}",
list_id
))),
ObjectState::List(liststate) => Ok(liststate),
}
}
*/
pub fn get_ops(&self, object_id: &OpID, key: &Key) -> Option<Vec<OpID>> {
self.objs.get(object_id)
.and_then(|obj| obj.props.get(key))
.map(|con_ops| con_ops.iter().map(|op| op.opid.clone()).collect())
}
fn get_obj(&mut self, object_id: &OpID) -> Result<&mut ObjState, AutomergeError> {
let object = self
@ -398,6 +311,16 @@ impl OpSet {
Ok(object)
}
pub fn check_for_duplicate(&self, request: &ChangeRequest) -> Result<(), AutomergeError> {
if self.clock.get(&request.actor) >= request.seq {
return Err(AutomergeError::DuplicateChange(format!(
"Change request has already been applied {}:{}",
request.actor.0, request.seq
)));
}
Ok(())
}
pub fn can_undo(&self) -> bool {
self.undo_pos > 0
}
@ -406,19 +329,6 @@ impl OpSet {
!self.redo_stack.is_empty()
}
/*
pub fn concurrent_operations_for_field(
&self,
object_id: &OpID,
key: &Key,
) -> Result<&[OperationWithMetadata], AutomergeError> {
Ok(self
.objs
.get(object_id)
.and_then(|state| state.props.get(&key))
.ok_or_else(|| AutomergeError::MissingObjectError(object_id.clone()))?)
}
*/
/// Get all the changes we have that are not in `since`
pub fn get_missing_changes(&self, since: &Clock) -> Vec<&Change> {
@ -434,47 +344,9 @@ impl OpSet {
// TODO: there's a lot of internal copying going on in here for something kinda simple
self.queue.iter().fold(Clock::empty(), |clock, change| {
clock
.union(&change.dependencies)
.union(&change.deps)
.with(&change.actor_id, change.seq - 1)
})
}
}
/*
pub fn list_ops_in_order<'a, S: BuildHasher>(
operations_by_elemid: &'a HashMap<ElementID, ConcurrentOperations, S>,
following: &HashMap<ElementID, Vec<ElementID>, S>,
) -> Result<Vec<(ElementID, &'a ConcurrentOperations)>, AutomergeError> {
// First we construct a vector of operations to process in order based
// on the insertion orders of the operations we've received
let mut ops_in_order: Vec<(ElementID, &ConcurrentOperations)> = Vec::new();
// start with everything that was inserted after _head
let mut to_process: Vec<ElementID> = following
.get(&ElementID::Head)
.map(|heads| {
let mut sorted = heads.to_vec();
sorted.sort();
sorted
})
.unwrap_or_else(Vec::new);
// for each element ID, add the operation to the ops_in_order list,
// then find all the following element IDs, sort them and add them to
// the list of element IDs still to process.
while let Some(next_element_id) = to_process.pop() {
let ops = operations_by_elemid.get(&next_element_id).ok_or_else(|| {
AutomergeError::InvalidChange(format!(
"Missing element ID {:?} when interpreting list ops",
next_element_id
))
})?;
ops_in_order.push((next_element_id.clone(), ops));
if let Some(followers) = following.get(&next_element_id) {
let mut sorted = followers.to_vec();
sorted.sort();
to_process.extend(sorted);
}
}
Ok(ops_in_order)
}
*/

View file

@ -13,7 +13,6 @@ pub struct OperationWithMetadata {
pub opid: OpID,
pub seq: u32,
pub actor_id: ActorID,
//pub start_op: u64,
pub operation: Operation,
}
@ -44,10 +43,6 @@ impl Hash for OperationWithMetadata {
}
impl OperationWithMetadata {
// pub fn opid(&self) -> OpID {
// OpID::ID(self.start_op, self.actor_id.0.clone())
// }
pub fn make_type(&self) -> Option<ObjType> {
match self.operation {
Operation::MakeMap { .. } => Some(ObjType::Map),
@ -185,26 +180,5 @@ impl OperationWithMetadata {
_ => false,
}
}
/*
pub fn is_concurrent(&self, ops: &[OperationWithMetadata]) -> bool {
!ops.iter()
.map(|op| op.opid)
.eq(self.pred().iter().cloned())
}
*/
}
/*
/// Note, we can't implement Ord because the Operation contains floating point
/// elements
impl PartialOrd for OperationWithMetadata {
fn partial_cmp(&self, other: &OperationWithMetadata) -> Option<Ordering> {
if self.actor_id == other.actor_id {
Some(self.seq.cmp(&other.seq))
} else {
Some(self.actor_id.cmp(&other.actor_id))
}
}
}
*/

View file

@ -362,6 +362,6 @@ pub struct Patch {
pub can_redo: bool,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub clock: Option<Clock>,
pub version: u32,
pub version: u64,
pub diffs: Diff2,
}

View file

@ -165,6 +165,12 @@ impl Clock {
result
}
pub fn without(&self, actor_id: &ActorID) -> Clock {
let mut result = self.clone();
result.0.remove(actor_id);
result
}
pub fn merge(&mut self, other: &Clock) {
other.into_iter().for_each(|(actor_id, seq)| {
self.set(actor_id, max(*seq, self.get(actor_id)));
@ -473,11 +479,10 @@ pub struct Change {
pub seq: u32,
#[serde(rename = "startOp")]
pub start_op: u64,
pub time: u64,
pub time: u128,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "deps")]
pub dependencies: Clock,
pub deps: Clock,
}
/*
@ -493,18 +498,23 @@ pub struct Moment {
pub struct ChangeRequest {
pub actor: ActorID,
pub seq: u32,
pub version: u32,
pub version: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default = "_true")]
pub undoable: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub undoable: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dependencies: Option<Clock>,
pub deps: Option<Clock>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ops: Option<Vec<OpRequest>>,
pub request_type: ChangeRequestType,
}
// :-/
fn _true() -> bool {
true
}
#[derive(Deserialize, PartialEq, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum ChangeRequestType {

View file

@ -0,0 +1,22 @@
#![allow(dead_code, unused_imports)]
use wasm_bindgen::prelude::*;
pub use std::time::*;
#[cfg(not(target_arch = "wasm32"))]
pub fn unix_timestamp() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis()).unwrap_or(0)
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = Date, js_name = now)]
fn date_now() -> f64;
}
#[cfg(target_arch = "wasm32")]
pub fn unix_timestamp() -> u128 {
date_now() as u128
}