Add external clock

This commit is contained in:
Andrew Jeffery 2023-03-07 17:13:32 +00:00
parent 5adca457cc
commit f951632f8c
6 changed files with 241 additions and 6 deletions

View file

@ -1,5 +1,6 @@
use std::ops::RangeBounds;
use crate::clock::ExClock;
use crate::exid::ExId;
use crate::op_observer::{BranchableObserver, OpObserver};
use crate::sync::SyncDoc;
@ -255,6 +256,16 @@ impl<Obs: Observation> AutoCommitWithObs<Obs> {
self.doc.get_last_local_change()
}
/// Get the vector clock for the given heads.
pub fn clock_for_heads(&self, heads: &[ChangeHash]) -> ExClock {
self.doc.clock_for_heads(heads)
}
/// Get the heads hashes for the given vector clock.
pub fn heads_for_clock(&self, clock: &ExClock) -> Result<Vec<ChangeHash>, AutomergeError> {
self.doc.heads_for_clock(clock)
}
pub fn get_changes(
&mut self,
have_deps: &[ChangeHash],

View file

@ -5,6 +5,7 @@ use std::num::NonZeroU64;
use std::ops::RangeBounds;
use crate::change_graph::ChangeGraph;
use crate::clock::ExClock;
use crate::columnar::Key as EncodedKey;
use crate::exid::ExId;
use crate::keys::Keys;
@ -868,6 +869,65 @@ impl Automerge {
.find(|c| c.actor_id() == self.get_actor());
}
/// Get the vector clock for the given heads.
pub fn clock_for_heads(&self, heads: &[ChangeHash]) -> ExClock {
let clock = self.change_graph.clock_for_heads(heads);
ExClock::from_internal_clock(clock, &self.ops.m.actors)
}
/// Get the heads hashes for the given vector clock.
pub fn heads_for_clock(&self, clock: &ExClock) -> Result<Vec<ChangeHash>, AutomergeError> {
// get the hash for the last change of each actor in the clock
let mut heads_and_clocks = Vec::new();
for (actor_id, data) in clock.iter() {
let actor_index = self.ops.m.actors.lookup(actor_id).unwrap();
if let Some(state) = self.states.get(&actor_index) {
if let Some(change_index) = state.get(data.seq as usize - 1) {
let change = &self.history[*change_index];
let hash = change.hash();
let clock = self.clock_at(&[hash]);
heads_and_clocks.push((hash, clock));
} else {
return Err(AutomergeError::FutureClock);
}
} else {
return Err(AutomergeError::FutureClock);
}
}
// minimize heads list to only those that are needed
// first sorting them to get the biggest clocks first
heads_and_clocks.sort_by(|left, right| match left.1.partial_cmp(&right.1) {
Some(ordering) => ordering.reverse(),
None => Ordering::Equal,
});
// then iterating through to filter out unneeded hashes
let mut final_heads = Vec::new();
let mut rolling_clock = Clock::default();
for (head, clock) in heads_and_clocks {
match rolling_clock.partial_cmp(&clock) {
Some(ordering) => match ordering {
Ordering::Less => {
final_heads.push(head);
rolling_clock.merge(clock);
}
Ordering::Equal => {
// ignore, already covered by the hashes
}
Ordering::Greater => {
// ignore, this hash is already covered
}
},
None => final_heads.push(head),
}
}
// and finally sorting them to make them compatible with the expected format
final_heads.sort_unstable();
Ok(final_heads)
}
fn clock_at(&self, heads: &[ChangeHash]) -> Clock {
self.change_graph.clock_for_heads(heads)
}

View file

@ -2,6 +2,7 @@ use itertools::Itertools;
use pretty_assertions::assert_eq;
use super::*;
use crate::clock::ClockData;
use crate::op_tree::B;
use crate::transaction::Transactable;
use crate::*;
@ -1552,3 +1553,61 @@ fn get_changes_heads_empty() {
let heads = doc.get_heads();
assert_eq!(doc.get_changes(&heads).unwrap(), Vec::<&Change>::new());
}
#[test]
fn clock_for() {
let mut doc1 = AutoCommit::new();
let actor1 = doc1.get_actor().clone();
let heads = doc1.get_heads();
let clock = ExClock::default();
assert_eq!(doc1.clock_for_heads(&heads), clock);
doc1.put(ROOT, "foo", "bar").unwrap();
let heads = doc1.get_heads();
let mut clock = ExClock::default();
clock.insert(actor1.clone(), ClockData { max_op: 1, seq: 1 });
assert_eq!(doc1.clock_for_heads(&heads), clock);
doc1.put(ROOT, "foo", "baz").unwrap();
doc1.put(ROOT, "zoo", "baz").unwrap();
let heads = doc1.get_heads();
let mut clock = ExClock::default();
clock.insert(actor1.clone(), ClockData { max_op: 3, seq: 2 });
assert_eq!(doc1.clock_for_heads(&heads), clock);
let mut doc2 = doc1.fork();
let actor2 = doc2.get_actor().clone();
let heads = doc2.get_heads();
let mut clock = ExClock::default();
clock.insert(actor1.clone(), ClockData { max_op: 3, seq: 2 });
assert_eq!(doc2.clock_for_heads(&heads), clock);
doc2.put(ROOT, "foo", "bar").unwrap();
let heads = doc2.get_heads();
let mut clock = ExClock::default();
clock.insert(actor1.clone(), ClockData { max_op: 3, seq: 2 });
clock.insert(actor2.clone(), ClockData { max_op: 4, seq: 1 });
assert_eq!(doc2.clock_for_heads(&heads), clock);
let mut doc3 = doc1.fork();
doc3.put(ROOT, "foo", "zoo").unwrap();
doc1.merge(&mut doc2).unwrap();
let heads = doc1.get_heads();
let mut clock = ExClock::default();
clock.insert(actor1, ClockData { max_op: 3, seq: 2 });
clock.insert(actor2, ClockData { max_op: 4, seq: 1 });
assert_eq!(doc1.clock_for_heads(&heads), clock);
let calculated_heads = doc1.heads_for_clock(&clock).unwrap();
assert_eq!(heads, calculated_heads);
doc1.merge(&mut doc3).unwrap();
let heads = doc1.get_heads();
let clock = doc1.clock_for_heads(&heads);
let calculated_heads = doc1.heads_for_clock(&clock).unwrap();
assert_eq!(heads, calculated_heads);
}

View file

@ -1,13 +1,110 @@
use crate::types::OpId;
use crate::{indexed_cache::IndexedCache, types::OpId, ActorId};
use fxhash::FxBuildHasher;
use std::{cmp::Ordering, collections::HashMap};
#[derive(Default, Debug, Clone, Copy, PartialEq)]
pub(crate) struct ClockData {
/// Vector clock mapping actor ids to the max op counter and sequence number of the changes created by that actor.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct ExClock(HashMap<ActorId, ClockData>);
impl ExClock {
pub(crate) fn from_internal_clock(clock: Clock, actors: &IndexedCache<ActorId>) -> Self {
let map = clock
.0
.into_iter()
.map(|(actor_index, data)| {
let actor_id = actors[actor_index].clone();
(actor_id, data)
})
.collect();
ExClock(map)
}
pub fn get(&self, actor: &ActorId) -> Option<&ClockData> {
self.0.get(actor)
}
pub fn insert(&mut self, actor: ActorId, data: ClockData) -> Option<ClockData> {
self.0.insert(actor, data)
}
pub fn remove(&mut self, actor: &ActorId) -> Option<ClockData> {
self.0.remove(actor)
}
pub fn iter(&self) -> impl Iterator<Item = (&ActorId, &ClockData)> {
self.0.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ActorId, &mut ClockData)> {
self.0.iter_mut()
}
fn is_greater(&self, other: &Self) -> bool {
let mut has_greater = false;
let mut others_found = 0;
for (actor, data) in &self.0 {
if let Some(other_data) = other.0.get(actor) {
if data < other_data {
// may be concurrent or less
return false;
} else if data > other_data {
has_greater = true;
}
others_found += 1;
} else {
// other doesn't have this so effectively has a greater element
has_greater = true;
}
}
if has_greater {
// if they are equal then we have seen every key in the other clock and have at least
// one greater element so our clock is greater
//
// If they aren't the same then we haven't seen every key but have a greater element
// anyway so are concurrent
others_found == other.0.len()
} else {
// our clock doesn't have anything greater than the other clock so can't be greater but
// could still be concurrent
false
}
}
}
impl PartialOrd for ExClock {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
if self.0 == other.0 {
Some(Ordering::Equal)
} else if self.is_greater(other) {
Some(Ordering::Greater)
} else if other.is_greater(self) {
Some(Ordering::Less)
} else {
// concurrent
None
}
}
}
impl IntoIterator for ExClock {
type Item = (ActorId, ClockData);
type IntoIter = std::collections::hash_map::IntoIter<ActorId, ClockData>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ClockData {
/// Maximum operation counter of the actor at the point in time.
pub(crate) max_op: u64,
pub max_op: u64,
/// Sequence number of the change from this actor.
pub(crate) seq: u64,
pub seq: u64,
}
// a clock for the same actor is ahead of another if it has a higher max_op
@ -58,6 +155,12 @@ impl Clock {
.or_insert(data);
}
pub(crate) fn merge(&mut self, other: Clock) {
for (index, data) in other.0 {
self.include(index, data)
}
}
pub(crate) fn covers(&self, id: &OpId) -> bool {
if let Some(data) = self.0.get(&id.actor()) {
data.max_op >= id.counter()

View file

@ -54,6 +54,8 @@ pub enum AutomergeError {
NonChangeCompressed,
#[error("id was not an object id")]
NotAnObject,
#[error("this clock refers to the future")]
FutureClock,
}
impl PartialEq for AutomergeError {

View file

@ -245,7 +245,7 @@ mod automerge;
mod autoserde;
mod change;
mod change_graph;
mod clock;
pub mod clock;
mod columnar;
mod convert;
mod error;