move things out of op_set that are not needed

This commit is contained in:
Orion Henry 2020-03-30 15:15:30 -07:00
parent 9eed955d18
commit fca6e1c09e
4 changed files with 111 additions and 116 deletions
automerge-backend-wasm/src
automerge-backend/src

View file

@ -90,7 +90,7 @@ impl State {
pub fn get_missing_changes(&self, clock: JsValue) -> Result<JsValue, JsValue> {
log!("get_missing_changes");
let c: Clock = js_to_rust(clock)?;
let changes = self.backend.get_missing_changes(c);
let changes = self.backend.get_missing_changes(&c);
rust_to_js(&changes)
}

View file

@ -1,5 +1,7 @@
use crate::actor_states::ActorStates;
use crate::error::AutomergeError;
use crate::op_set::{OpSet, Version};
use crate::patch::PendingDiff;
use crate::patch::{Diff, Patch};
use crate::protocol::{
DataType, ObjAlias, ObjType, ObjectID, OpType, Operation, ReqOpType, UndoOperation,
@ -7,11 +9,14 @@ use crate::protocol::{
use crate::time;
use crate::{ActorID, Change, ChangeRequest, ChangeRequestType, Clock, OpID};
use std::collections::HashMap;
use std::rc::Rc;
#[derive(Debug, PartialEq, Clone)]
pub struct Backend {
versions: Vec<Version>,
queue: Vec<Rc<Change>>,
op_set: OpSet,
states: ActorStates,
obj_alias: ObjAlias,
}
@ -26,7 +31,9 @@ impl Backend {
Backend {
versions,
op_set: OpSet::init(),
queue: Vec::new(),
obj_alias: ObjAlias::new(),
states: ActorStates::new(),
}
}
@ -35,7 +42,7 @@ impl Backend {
request: &ChangeRequest,
op_set: &OpSet,
start_op: u64,
) -> Result<Change, AutomergeError> {
) -> Result<Rc<Change>, AutomergeError> {
let time = time::unix_timestamp();
let actor_id = request.actor.clone();
let mut operations: Vec<Operation> = Vec::new();
@ -54,12 +61,7 @@ impl Backend {
}
let mut elemids = elemid_cache.entry(object_id.clone()).or_insert_with(|| {
op_set
.get_elem_ids(&object_id)
.unwrap_or_default()
.iter()
.cloned()
.collect()
op_set.get_elem_ids(&object_id).unwrap_or_default().to_vec()
});
let key = rop.resolve_key(&id, &mut elemids)?;
@ -95,7 +97,7 @@ impl Backend {
operations.push(op);
}
}
Ok(Change {
Ok(Rc::new(Change {
start_op,
message: request.message.clone(),
actor_id: request.actor.clone(),
@ -103,7 +105,7 @@ impl Backend {
deps: request.deps.clone().unwrap_or_default(),
time,
operations,
})
}))
}
fn make_patch(
@ -127,7 +129,11 @@ impl Backend {
})
}
fn undo(&mut self, request: &ChangeRequest, start_op: u64) -> Result<Change, AutomergeError> {
fn undo(
&mut self,
request: &ChangeRequest,
start_op: u64,
) -> Result<Rc<Change>, AutomergeError> {
let undo_pos = self.op_set.undo_pos;
if undo_pos < 1 || self.op_set.undo_stack.len() < undo_pos {
@ -155,7 +161,7 @@ impl Backend {
})
.collect();
let change = Change {
let change = Rc::new(Change {
actor_id: request.actor.clone(),
seq: request.seq,
start_op,
@ -163,7 +169,7 @@ impl Backend {
message: request.message.clone(),
time: time::unix_timestamp(),
operations,
};
});
self.op_set.undo_pos -= 1;
self.op_set.redo_stack.push(redo_ops);
@ -171,7 +177,11 @@ impl Backend {
Ok(change)
}
fn redo(&mut self, request: &ChangeRequest, start_op: u64) -> Result<Change, AutomergeError> {
fn redo(
&mut self,
request: &ChangeRequest,
start_op: u64,
) -> Result<Rc<Change>, AutomergeError> {
let mut redo_ops = self
.op_set
.redo_stack
@ -189,7 +199,7 @@ impl Backend {
})
.collect();
let change = Change {
let change = Rc::new(Change {
actor_id: request.actor.clone(),
seq: request.seq,
start_op,
@ -197,20 +207,22 @@ impl Backend {
message: request.message.clone(),
time: time::unix_timestamp(),
operations,
};
});
self.op_set.undo_pos += 1;
Ok(change)
}
pub fn load_changes(&mut self, changes: Vec<Change>) -> Result<(), AutomergeError> {
pub fn load_changes(&mut self, mut changes: Vec<Change>) -> Result<(), AutomergeError> {
let changes = changes.drain(0..).map(Rc::new).collect();
self.apply(changes, None, false, false)?;
Ok(())
}
pub fn apply_changes(&mut self, changes: Vec<Change>) -> Result<Patch, AutomergeError> {
pub fn apply_changes(&mut self, mut changes: Vec<Change>) -> Result<Patch, AutomergeError> {
self.versions.iter_mut().for_each(|v| v.local_only = false);
let changes = changes.drain(0..).map(Rc::new).collect();
self.apply(changes, None, false, true)
}
@ -223,7 +235,7 @@ impl Backend {
fn apply(
&mut self,
mut changes: Vec<Change>,
mut changes: Vec<Rc<Change>>,
request: Option<&ChangeRequest>,
undoable: bool,
incremental: bool,
@ -231,9 +243,7 @@ impl Backend {
let mut pending_diffs = Vec::new();
for change in changes.drain(0..) {
let diffs = self
.op_set
.add_change(change, request.is_some(), undoable)?;
let diffs = self.add_change(change, request.is_some(), undoable)?;
pending_diffs.extend(diffs);
}
@ -293,10 +303,63 @@ impl Backend {
// Ok(self.make_patch(diffs.unwrap(), Some(&tmp_request), true)?)
}
fn add_change(
&mut self,
change: Rc<Change>,
local: bool,
undoable: bool,
) -> Result<Vec<PendingDiff>, AutomergeError> {
if local {
self.apply_change(change, local, undoable)
} else {
self.queue.push(change);
self.apply_queued_ops()
}
}
fn apply_queued_ops(&mut self) -> Result<Vec<PendingDiff>, AutomergeError> {
let mut all_diffs = Vec::new();
while let Some(next_change) = self.pop_next_causally_ready_change() {
let diffs = self.apply_change(next_change, false, false)?;
all_diffs.extend(diffs)
}
Ok(all_diffs)
}
fn apply_change(
&mut self,
change: Rc<Change>,
local: bool,
undoable: bool,
) -> Result<Vec<PendingDiff>, AutomergeError> {
if let Some(all_deps) = self.states.add_change(&change)? {
// FIXME - move these to backend?
self.op_set.clock.set(&change.actor_id, change.seq);
self.op_set.deps.subtract(&all_deps);
self.op_set.deps.set(&change.actor_id, change.seq);
} else {
return Ok(Vec::new());
}
self.op_set.apply_change(change, local, undoable)
}
fn pop_next_causally_ready_change(&mut self) -> Option<Rc<Change>> {
let mut index = 0;
while index < self.queue.len() {
let change = self.queue.get(index).unwrap();
let deps = change.deps.with(&change.actor_id, change.seq - 1);
if deps <= self.op_set.clock {
return Some(self.queue.remove(index));
}
index += 1
}
None
}
fn finalize_version(
&mut self,
request_version: u64,
change: Change,
change: Rc<Change>,
) -> Result<(), AutomergeError> {
// remove all versions older than this one
// i wish i had drain filter
@ -314,7 +377,7 @@ impl Backend {
v.op_set = self.op_set.clone()
} else {
v.op_set = self.op_set.clone();
v.op_set.add_change(change.clone(), true, false)?;
v.op_set.apply_change(change.clone(), true, false)?;
}
}
@ -330,12 +393,7 @@ impl Backend {
}
pub fn history(&self) -> Vec<&Change> {
self.op_set
.states
.history
.iter()
.map(|rc| rc.as_ref())
.collect()
self.states.history.iter().map(|rc| rc.as_ref()).collect()
}
pub fn get_patch(&self) -> Result<Patch, AutomergeError> {
@ -343,26 +401,34 @@ impl Backend {
self.make_patch(Some(diffs), None, false)
}
/// Get changes which are in `other` but not in this backend
pub fn get_changes<'a>(&self, other: &'a Backend) -> Result<Vec<&'a Change>, AutomergeError> {
if self.clock().divergent(&other.clock()) {
return Err(AutomergeError::DivergedState(
"Cannot diff two states that have diverged".to_string(),
));
}
Ok(other.op_set.get_missing_changes(&self.op_set.clock))
Ok(other.get_missing_changes(&self.op_set.clock))
}
pub fn get_changes_for_actor_id(&self, actor_id: &ActorID) -> Vec<&Change> {
self.op_set.states.get(actor_id)
self.states.get(actor_id)
}
pub fn get_missing_changes(&self, clock: Clock) -> Vec<&Change> {
self.op_set.get_missing_changes(&clock)
pub fn get_missing_changes(&self, since: &Clock) -> Vec<&Change> {
self.states
.history
.iter()
.map(|rc| rc.as_ref())
.filter(|change| change.seq > since.get(&change.actor_id))
.collect()
}
pub fn get_missing_deps(&self) -> Clock {
self.op_set.get_missing_deps()
let mut clock = Clock::empty();
for change in self.queue.iter() {
clock.merge(&change.deps.with(&change.actor_id, change.seq - 1))
}
clock
}
pub fn get_elem_ids(&self, object_id: &ObjectID) -> Result<&[OpID], AutomergeError> {
@ -371,7 +437,7 @@ impl Backend {
pub fn merge(&mut self, remote: &Backend) -> Result<Patch, AutomergeError> {
let missing_changes = remote
.get_missing_changes(self.op_set.clock.clone())
.get_missing_changes(&self.op_set.clock)
.iter()
.cloned()
.cloned()

View file

@ -2,7 +2,7 @@ use crate::protocol::{Key, ObjectID, OpID, OpRequest};
use std::error::Error;
use std::fmt;
#[derive(Debug,PartialEq)]
#[derive(Debug, PartialEq)]
pub enum AutomergeError {
MissingObjectError(ObjectID),
MissingIndex(OpID),

View file

@ -6,7 +6,6 @@
//! document::state) the implementation fetches the root object ID's history
//! and then recursively walks through the tree of histories constructing the
//! state. Obviously this is not very efficient.
use crate::actor_states::ActorStates;
use crate::concurrent_operations::ConcurrentOperations;
use crate::error::AutomergeError;
use crate::object_store::ObjState;
@ -42,13 +41,13 @@ pub(crate) struct Version {
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct OpSet {
pub objs: HashMap<ObjectID, ObjState>,
queue: Vec<Change>,
//queue: Vec<Change>,
pub clock: Clock,
pub deps: Clock,
pub undo_pos: usize,
pub undo_stack: Vec<Vec<UndoOperation>>,
pub redo_stack: Vec<Vec<UndoOperation>>,
pub states: ActorStates,
//pub states: ActorStates,
pub max_op: u64,
}
@ -59,59 +58,17 @@ impl OpSet {
OpSet {
objs,
queue: Vec::new(),
//queue: Vec::new(),
clock: Clock::empty(),
deps: Clock::empty(),
undo_pos: 0,
undo_stack: Vec::new(),
redo_stack: Vec::new(),
states: ActorStates::new(),
//states: ActorStates::new(),
max_op: 0,
}
}
/// Adds a change to the internal queue of operations, then iteratively
/// applies all causally ready changes until there are none remaining
///
/// If `make_undoable` is true, the op set will store a set of operations
/// which can be used to undo this change.
pub(crate) fn add_change(
&mut self,
change: Change,
local: bool,
undoable: bool,
) -> Result<Vec<PendingDiff>, AutomergeError> {
if local {
self.apply_change(change, local, undoable)
} else {
self.queue.push(change);
self.apply_queued_ops()
}
}
fn apply_queued_ops(&mut self) -> Result<Vec<PendingDiff>, AutomergeError> {
let mut all_diffs = Vec::new();
while let Some(next_change) = self.pop_next_causally_ready_change() {
let diffs = self.apply_change(next_change, false, false)?;
all_diffs.extend(diffs)
}
Ok(all_diffs)
}
fn pop_next_causally_ready_change(&mut self) -> Option<Change> {
let mut index = 0;
while index < self.queue.len() {
let change = self.queue.get(index).unwrap();
let deps = change.deps.with(&change.actor_id, change.seq - 1);
if deps <= self.clock {
return Some(self.queue.remove(index));
}
index += 1
}
None
}
fn apply_ops(
&mut self,
change: &Rc<Change>,
@ -137,22 +94,12 @@ impl OpSet {
Ok((all_undo_ops, diffs))
}
fn apply_change(
pub(crate) fn apply_change(
&mut self,
change: Change,
change: Rc<Change>,
_local: bool,
undoable: bool,
) -> Result<Vec<PendingDiff>, AutomergeError> {
let change = Rc::new(change);
if let Some(all_deps) = self.states.add_change(&change)? {
self.clock.set(&change.actor_id, change.seq);
self.deps.subtract(&all_deps);
self.deps.set(&change.actor_id, change.seq);
} else {
return Ok(Vec::new());
}
let (undo_ops, diffs) = self.apply_ops(&change, undoable)?;
self.max_op = max(self.max_op, change.max_op());
@ -213,7 +160,7 @@ impl OpSet {
(false, true) => {
let id = op.operation_key().to_opid()?;
let index = object.get_index_for(&id)?;
object.seq.insert(index, id.clone());
object.seq.insert(index, id);
PendingDiff::SeqInsert(op.clone(), index)
}
(false, false) => PendingDiff::Noop,
@ -374,28 +321,10 @@ impl OpSet {
!self.redo_stack.is_empty()
}
/// Get all the changes we have that are not in `since`
pub fn get_missing_changes(&self, since: &Clock) -> Vec<&Change> {
self.states
.history
.iter()
.map(|rc| rc.as_ref())
.filter(|change| change.seq > since.get(&change.actor_id))
.collect()
}
pub fn get_elem_ids(&self, object_id: &ObjectID) -> Result<&[OpID], AutomergeError> {
self.get_obj(object_id).map(|o| o.seq.as_slice())
}
pub fn get_missing_deps(&self) -> Clock {
let mut clock = Clock::empty();
for change in self.queue.iter() {
clock.merge(&change.deps.with(&change.actor_id, change.seq - 1))
}
clock
}
pub fn get_pred(&self, object_id: &ObjectID, key: &Key, insert: bool) -> Vec<OpID> {
if insert {
Vec::new()