automerge/rust/automerge/src/sync.rs
Alex Good 0ab6a770d8 wasm: improve error messages
The error messages produced by various conversions in `automerge-wasm`
were quite uninformative - often consisting of just returning the
offending value with no description of the problem. The logic of these
error messages was often hard to trace due to the use of `JsValue` to
represent both error conditions and valid values - evidenced by most of
the public functions of `automerge-wasm` having return types of
`Result<JsValue, JsValue>`. Change these return types to mention
specific errors, thus enlisting the compilers help in ensuring that
specific error messages are emitted.
2022-12-02 14:42:55 +00:00

802 lines
27 KiB
Rust

use itertools::Itertools;
use serde::ser::SerializeMap;
use std::collections::{HashMap, HashSet};
use crate::{
storage::{parse, Change as StoredChange, ReadChangeOpError},
Automerge, AutomergeError, Change, ChangeHash, OpObserver,
};
mod bloom;
mod state;
pub use bloom::{BloomFilter, DecodeError as DecodeBloomError};
pub use state::DecodeError as DecodeStateError;
pub use state::{Have, State};
const MESSAGE_TYPE_SYNC: u8 = 0x42; // first byte of a sync message, for identification
impl Automerge {
pub fn generate_sync_message(&self, sync_state: &mut State) -> Option<Message> {
let our_heads = self.get_heads();
let our_need = self.get_missing_deps(sync_state.their_heads.as_ref().unwrap_or(&vec![]));
let their_heads_set = if let Some(ref heads) = sync_state.their_heads {
heads.iter().collect::<HashSet<_>>()
} else {
HashSet::new()
};
let our_have = if our_need.iter().all(|hash| their_heads_set.contains(hash)) {
vec![self.make_bloom_filter(sync_state.shared_heads.clone())]
} else {
Vec::new()
};
if let Some(ref their_have) = sync_state.their_have {
if let Some(first_have) = their_have.first().as_ref() {
if !first_have
.last_sync
.iter()
.all(|hash| self.get_change_by_hash(hash).is_some())
{
let reset_msg = Message {
heads: our_heads,
need: Vec::new(),
have: vec![Have::default()],
changes: Vec::new(),
};
return Some(reset_msg);
}
}
}
let changes_to_send = if let (Some(their_have), Some(their_need)) = (
sync_state.their_have.as_ref(),
sync_state.their_need.as_ref(),
) {
self.get_changes_to_send(their_have, their_need)
.expect("Should have only used hashes that are in the document")
} else {
Vec::new()
};
let heads_unchanged = sync_state.last_sent_heads == our_heads;
let heads_equal = if let Some(their_heads) = sync_state.their_heads.as_ref() {
their_heads == &our_heads
} else {
false
};
// deduplicate the changes to send with those we have already sent and clone it now
let changes_to_send = changes_to_send
.into_iter()
.filter_map(|change| {
if !sync_state.sent_hashes.contains(&change.hash()) {
Some(change.clone())
} else {
None
}
})
.collect::<Vec<_>>();
if heads_unchanged {
if heads_equal && changes_to_send.is_empty() {
return None;
}
if sync_state.in_flight {
return None;
}
}
sync_state.last_sent_heads = our_heads.clone();
sync_state
.sent_hashes
.extend(changes_to_send.iter().map(|c| c.hash()));
let sync_message = Message {
heads: our_heads,
have: our_have,
need: our_need,
changes: changes_to_send,
};
sync_state.in_flight = true;
Some(sync_message)
}
pub fn receive_sync_message(
&mut self,
sync_state: &mut State,
message: Message,
) -> Result<(), AutomergeError> {
self.receive_sync_message_with::<()>(sync_state, message, None)
}
pub fn receive_sync_message_with<Obs: OpObserver>(
&mut self,
sync_state: &mut State,
message: Message,
op_observer: Option<&mut Obs>,
) -> Result<(), AutomergeError> {
let before_heads = self.get_heads();
let Message {
heads: message_heads,
changes: message_changes,
need: message_need,
have: message_have,
} = message;
let changes_is_empty = message_changes.is_empty();
if !changes_is_empty {
self.apply_changes_with(message_changes, op_observer)?;
sync_state.shared_heads = advance_heads(
&before_heads.iter().collect(),
&self.get_heads().into_iter().collect(),
&sync_state.shared_heads,
);
}
// trim down the sent hashes to those that we know they haven't seen
self.filter_changes(&message_heads, &mut sync_state.sent_hashes)?;
if changes_is_empty && message_heads == before_heads {
sync_state.last_sent_heads = message_heads.clone();
}
if sync_state.sent_hashes.is_empty() {
sync_state.in_flight = false;
}
let known_heads = message_heads
.iter()
.filter(|head| self.get_change_by_hash(head).is_some())
.collect::<Vec<_>>();
if known_heads.len() == message_heads.len() {
sync_state.shared_heads = message_heads.clone();
sync_state.in_flight = false;
// If the remote peer has lost all its data, reset our state to perform a full resync
if message_heads.is_empty() {
sync_state.last_sent_heads = Default::default();
sync_state.sent_hashes = Default::default();
}
} else {
sync_state.shared_heads = sync_state
.shared_heads
.iter()
.chain(known_heads)
.copied()
.unique()
.sorted()
.collect::<Vec<_>>();
}
sync_state.their_have = Some(message_have);
sync_state.their_heads = Some(message_heads);
sync_state.their_need = Some(message_need);
Ok(())
}
fn make_bloom_filter(&self, last_sync: Vec<ChangeHash>) -> Have {
let new_changes = self
.get_changes(&last_sync)
.expect("Should have only used hashes that are in the document");
let hashes = new_changes.iter().map(|change| change.hash());
Have {
last_sync,
bloom: BloomFilter::from_hashes(hashes),
}
}
fn get_changes_to_send(
&self,
have: &[Have],
need: &[ChangeHash],
) -> Result<Vec<&Change>, AutomergeError> {
if have.is_empty() {
Ok(need
.iter()
.filter_map(|hash| self.get_change_by_hash(hash))
.collect())
} else {
let mut last_sync_hashes = HashSet::new();
let mut bloom_filters = Vec::with_capacity(have.len());
for h in have {
let Have { last_sync, bloom } = h;
last_sync_hashes.extend(last_sync);
bloom_filters.push(bloom);
}
let last_sync_hashes = last_sync_hashes.into_iter().copied().collect::<Vec<_>>();
let changes = self.get_changes(&last_sync_hashes)?;
let mut change_hashes = HashSet::with_capacity(changes.len());
let mut dependents: HashMap<ChangeHash, Vec<ChangeHash>> = HashMap::new();
let mut hashes_to_send = HashSet::new();
for change in &changes {
change_hashes.insert(change.hash());
for dep in change.deps() {
dependents.entry(*dep).or_default().push(change.hash());
}
if bloom_filters
.iter()
.all(|bloom| !bloom.contains_hash(&change.hash()))
{
hashes_to_send.insert(change.hash());
}
}
let mut stack = hashes_to_send.iter().copied().collect::<Vec<_>>();
while let Some(hash) = stack.pop() {
if let Some(deps) = dependents.get(&hash) {
for dep in deps {
if hashes_to_send.insert(*dep) {
stack.push(*dep);
}
}
}
}
let mut changes_to_send = Vec::new();
for hash in need {
if !hashes_to_send.contains(hash) {
if let Some(change) = self.get_change_by_hash(hash) {
changes_to_send.push(change);
}
}
}
for change in changes {
if hashes_to_send.contains(&change.hash()) {
changes_to_send.push(change);
}
}
Ok(changes_to_send)
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ReadMessageError {
#[error("expected {expected_one_of:?} but found {found}")]
WrongType { expected_one_of: Vec<u8>, found: u8 },
#[error("{0}")]
Parse(String),
#[error(transparent)]
ReadChangeOps(#[from] ReadChangeOpError),
#[error("not enough input")]
NotEnoughInput,
}
impl From<parse::leb128::Error> for ReadMessageError {
fn from(e: parse::leb128::Error) -> Self {
ReadMessageError::Parse(e.to_string())
}
}
impl From<bloom::ParseError> for ReadMessageError {
fn from(e: bloom::ParseError) -> Self {
ReadMessageError::Parse(e.to_string())
}
}
impl From<crate::storage::change::ParseError> for ReadMessageError {
fn from(e: crate::storage::change::ParseError) -> Self {
ReadMessageError::Parse(format!("error parsing changes: {}", e))
}
}
impl From<ReadMessageError> for parse::ParseError<ReadMessageError> {
fn from(e: ReadMessageError) -> Self {
parse::ParseError::Error(e)
}
}
impl From<parse::ParseError<ReadMessageError>> for ReadMessageError {
fn from(p: parse::ParseError<ReadMessageError>) -> Self {
match p {
parse::ParseError::Error(e) => e,
parse::ParseError::Incomplete(..) => Self::NotEnoughInput,
}
}
}
/// The sync message to be sent.
#[derive(Clone, Debug, PartialEq)]
pub struct Message {
/// The heads of the sender.
pub heads: Vec<ChangeHash>,
/// The hashes of any changes that are being explicitly requested from the recipient.
pub need: Vec<ChangeHash>,
/// A summary of the changes that the sender already has.
pub have: Vec<Have>,
/// The changes for the recipient to apply.
pub changes: Vec<Change>,
}
impl serde::Serialize for Message {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(4))?;
map.serialize_entry("heads", &self.heads)?;
map.serialize_entry("need", &self.need)?;
map.serialize_entry("have", &self.have)?;
map.serialize_entry(
"changes",
&self
.changes
.iter()
.map(crate::ExpandedChange::from)
.collect::<Vec<_>>(),
)?;
map.end()
}
}
fn parse_have(input: parse::Input<'_>) -> parse::ParseResult<'_, Have, ReadMessageError> {
let (i, last_sync) = parse::length_prefixed(parse::change_hash)(input)?;
let (i, bloom_bytes) = parse::length_prefixed_bytes(i)?;
let (_, bloom) = BloomFilter::parse(parse::Input::new(bloom_bytes)).map_err(|e| e.lift())?;
Ok((i, Have { last_sync, bloom }))
}
impl Message {
pub fn decode(input: &[u8]) -> Result<Self, ReadMessageError> {
let input = parse::Input::new(input);
match Self::parse(input) {
Ok((_, msg)) => Ok(msg),
Err(parse::ParseError::Error(e)) => Err(e),
Err(parse::ParseError::Incomplete(_)) => Err(ReadMessageError::NotEnoughInput),
}
}
pub(crate) fn parse(input: parse::Input<'_>) -> parse::ParseResult<'_, Self, ReadMessageError> {
let (i, message_type) = parse::take1(input)?;
if message_type != MESSAGE_TYPE_SYNC {
return Err(parse::ParseError::Error(ReadMessageError::WrongType {
expected_one_of: vec![MESSAGE_TYPE_SYNC],
found: message_type,
}));
}
let (i, heads) = parse::length_prefixed(parse::change_hash)(i)?;
let (i, need) = parse::length_prefixed(parse::change_hash)(i)?;
let (i, have) = parse::length_prefixed(parse_have)(i)?;
let change_parser = |i| {
let (i, bytes) = parse::length_prefixed_bytes(i)?;
let (_, change) =
StoredChange::parse(parse::Input::new(bytes)).map_err(|e| e.lift())?;
Ok((i, change))
};
let (i, stored_changes) = parse::length_prefixed(change_parser)(i)?;
let changes_len = stored_changes.len();
let changes: Vec<Change> = stored_changes
.into_iter()
.try_fold::<_, _, Result<_, ReadMessageError>>(
Vec::with_capacity(changes_len),
|mut acc, stored| {
let change = Change::new_from_unverified(stored.into_owned(), None)
.map_err(ReadMessageError::ReadChangeOps)?;
acc.push(change);
Ok(acc)
},
)?;
Ok((
i,
Message {
heads,
need,
have,
changes,
},
))
}
pub fn encode(mut self) -> Vec<u8> {
let mut buf = vec![MESSAGE_TYPE_SYNC];
encode_hashes(&mut buf, &self.heads);
encode_hashes(&mut buf, &self.need);
encode_many(&mut buf, self.have.iter(), |buf, h| {
encode_hashes(buf, &h.last_sync);
leb128::write::unsigned(buf, h.bloom.to_bytes().len() as u64).unwrap();
buf.extend(h.bloom.to_bytes());
});
encode_many(&mut buf, self.changes.iter_mut(), |buf, change| {
leb128::write::unsigned(buf, change.raw_bytes().len() as u64).unwrap();
buf.extend(change.raw_bytes().as_ref())
});
buf
}
}
fn encode_many<'a, I, It, F>(out: &mut Vec<u8>, data: I, f: F)
where
I: Iterator<Item = It> + ExactSizeIterator + 'a,
F: Fn(&mut Vec<u8>, It),
{
leb128::write::unsigned(out, data.len() as u64).unwrap();
for datum in data {
f(out, datum)
}
}
fn encode_hashes(buf: &mut Vec<u8>, hashes: &[ChangeHash]) {
debug_assert!(
hashes.windows(2).all(|h| h[0] <= h[1]),
"hashes were not sorted"
);
encode_many(buf, hashes.iter(), |buf, hash| buf.extend(hash.as_bytes()))
}
fn advance_heads(
my_old_heads: &HashSet<&ChangeHash>,
my_new_heads: &HashSet<ChangeHash>,
our_old_shared_heads: &[ChangeHash],
) -> Vec<ChangeHash> {
let new_heads = my_new_heads
.iter()
.filter(|head| !my_old_heads.contains(head))
.copied()
.collect::<Vec<_>>();
let common_heads = our_old_shared_heads
.iter()
.filter(|head| my_new_heads.contains(head))
.copied()
.collect::<Vec<_>>();
let mut advanced_heads = HashSet::with_capacity(new_heads.len() + common_heads.len());
for head in new_heads.into_iter().chain(common_heads) {
advanced_heads.insert(head);
}
let mut advanced_heads = advanced_heads.into_iter().collect::<Vec<_>>();
advanced_heads.sort();
advanced_heads
}
#[cfg(test)]
mod tests {
use super::*;
use crate::change::gen::gen_change;
use crate::storage::parse::Input;
use crate::transaction::Transactable;
use crate::types::gen::gen_hash;
use crate::ActorId;
use proptest::prelude::*;
prop_compose! {
fn gen_bloom()(hashes in gen_sorted_hashes(0..10)) -> BloomFilter {
BloomFilter::from_hashes(hashes.into_iter())
}
}
prop_compose! {
fn gen_have()(bloom in gen_bloom(), last_sync in gen_sorted_hashes(0..10)) -> Have {
Have {
bloom,
last_sync,
}
}
}
fn gen_sorted_hashes(size: std::ops::Range<usize>) -> impl Strategy<Value = Vec<ChangeHash>> {
proptest::collection::vec(gen_hash(), size).prop_map(|mut h| {
h.sort();
h
})
}
prop_compose! {
fn gen_sync_message()(
heads in gen_sorted_hashes(0..10),
need in gen_sorted_hashes(0..10),
have in proptest::collection::vec(gen_have(), 0..10),
changes in proptest::collection::vec(gen_change(), 0..10),
) -> Message {
Message {
heads,
need,
have,
changes,
}
}
}
#[test]
fn encode_decode_empty_message() {
let msg = Message {
heads: vec![],
need: vec![],
have: vec![],
changes: vec![],
};
let encoded = msg.encode();
Message::parse(Input::new(&encoded)).unwrap();
}
proptest! {
#[test]
fn encode_decode_message(msg in gen_sync_message()) {
let encoded = msg.clone().encode();
let (i, decoded) = Message::parse(Input::new(&encoded)).unwrap();
assert!(i.is_empty());
assert_eq!(msg, decoded);
}
}
#[test]
fn generate_sync_message_twice_does_nothing() {
let mut doc = crate::AutoCommit::new();
doc.put(crate::ROOT, "key", "value").unwrap();
let mut sync_state = State::new();
assert!(doc.generate_sync_message(&mut sync_state).is_some());
assert!(doc.generate_sync_message(&mut sync_state).is_none());
}
#[test]
fn should_not_reply_if_we_have_no_data() {
let mut doc1 = crate::AutoCommit::new();
let mut doc2 = crate::AutoCommit::new();
let mut s1 = State::new();
let mut s2 = State::new();
let m1 = doc1
.generate_sync_message(&mut s1)
.expect("message was none");
doc2.receive_sync_message(&mut s2, m1).unwrap();
let m2 = doc2.generate_sync_message(&mut s2);
assert!(m2.is_none());
}
#[test]
fn should_allow_simultaneous_messages_during_synchronisation() {
// create & synchronize two nodes
let mut doc1 = crate::AutoCommit::new().with_actor(ActorId::try_from("abc123").unwrap());
let mut doc2 = crate::AutoCommit::new().with_actor(ActorId::try_from("def456").unwrap());
let mut s1 = State::new();
let mut s2 = State::new();
for i in 0..5 {
doc1.put(&crate::ROOT, "x", i).unwrap();
doc1.commit();
doc2.put(&crate::ROOT, "y", i).unwrap();
doc2.commit();
}
let head1 = doc1.get_heads()[0];
let head2 = doc2.get_heads()[0];
//// both sides report what they have but have no shared peer state
let msg1to2 = doc1
.generate_sync_message(&mut s1)
.expect("initial sync from 1 to 2 was None");
let msg2to1 = doc2
.generate_sync_message(&mut s2)
.expect("initial sync message from 2 to 1 was None");
assert_eq!(msg1to2.changes.len(), 0);
assert_eq!(msg1to2.have[0].last_sync.len(), 0);
assert_eq!(msg2to1.changes.len(), 0);
assert_eq!(msg2to1.have[0].last_sync.len(), 0);
//// doc1 and doc2 receive that message and update sync state
doc1.receive_sync_message(&mut s1, msg2to1).unwrap();
doc2.receive_sync_message(&mut s2, msg1to2).unwrap();
//// now both reply with their local changes the other lacks
//// (standard warning that 1% of the time this will result in a "need" message)
let msg1to2 = doc1
.generate_sync_message(&mut s1)
.expect("first reply from 1 to 2 was None");
assert_eq!(msg1to2.changes.len(), 5);
let msg2to1 = doc2
.generate_sync_message(&mut s2)
.expect("first reply from 2 to 1 was None");
assert_eq!(msg2to1.changes.len(), 5);
//// both should now apply the changes
doc1.receive_sync_message(&mut s1, msg2to1).unwrap();
assert_eq!(doc1.get_missing_deps(&[]), Vec::new());
doc2.receive_sync_message(&mut s2, msg1to2).unwrap();
assert_eq!(doc2.get_missing_deps(&[]), Vec::new());
//// The response acknowledges the changes received and sends no further changes
let msg1to2 = doc1
.generate_sync_message(&mut s1)
.expect("second reply from 1 to 2 was None");
assert_eq!(msg1to2.changes.len(), 0);
let msg2to1 = doc2
.generate_sync_message(&mut s2)
.expect("second reply from 2 to 1 was None");
assert_eq!(msg2to1.changes.len(), 0);
//// After receiving acknowledgements, their shared heads should be equal
doc1.receive_sync_message(&mut s1, msg2to1).unwrap();
doc2.receive_sync_message(&mut s2, msg1to2).unwrap();
assert_eq!(s1.shared_heads, s2.shared_heads);
//// We're in sync, no more messages required
assert!(doc1.generate_sync_message(&mut s1).is_none());
assert!(doc2.generate_sync_message(&mut s2).is_none());
//// If we make one more change and start another sync then its lastSync should be updated
doc1.put(crate::ROOT, "x", 5).unwrap();
doc1.commit();
let msg1to2 = doc1
.generate_sync_message(&mut s1)
.expect("third reply from 1 to 2 was None");
let mut expected_heads = vec![head1, head2];
expected_heads.sort();
let mut actual_heads = msg1to2.have[0].last_sync.clone();
actual_heads.sort();
assert_eq!(actual_heads, expected_heads);
}
#[test]
fn should_handle_false_positive_head() {
// Scenario: ,-- n1
// c0 <-- c1 <-- c2 <-- c3 <-- c4 <-- c5 <-- c6 <-- c7 <-- c8 <-- c9 <-+
// `-- n2
// where n2 is a false positive in the Bloom filter containing {n1}.
// lastSync is c9.
let mut doc1 = crate::AutoCommit::new().with_actor(ActorId::try_from("abc123").unwrap());
let mut doc2 = crate::AutoCommit::new().with_actor(ActorId::try_from("def456").unwrap());
let mut s1 = State::new();
let mut s2 = State::new();
for i in 0..10 {
doc1.put(crate::ROOT, "x", i).unwrap();
doc1.commit();
}
sync(&mut doc1, &mut doc2, &mut s1, &mut s2);
// search for false positive; see comment above
let mut i = 0;
let (mut doc1, mut doc2) = loop {
let mut doc1copy = doc1
.clone()
.with_actor(ActorId::try_from("01234567").unwrap());
let val1 = format!("{} @ n1", i);
doc1copy.put(crate::ROOT, "x", val1).unwrap();
doc1copy.commit();
let mut doc2copy = doc1
.clone()
.with_actor(ActorId::try_from("89abcdef").unwrap());
let val2 = format!("{} @ n2", i);
doc2copy.put(crate::ROOT, "x", val2).unwrap();
doc2copy.commit();
let n1_bloom = BloomFilter::from_hashes(doc1copy.get_heads().into_iter());
if n1_bloom.contains_hash(&doc2copy.get_heads()[0]) {
break (doc1copy, doc2copy);
}
i += 1;
};
let mut all_heads = doc1.get_heads();
all_heads.extend(doc2.get_heads());
all_heads.sort();
// reset sync states
let (_, mut s1) = State::parse(Input::new(s1.encode().as_slice())).unwrap();
let (_, mut s2) = State::parse(Input::new(s2.encode().as_slice())).unwrap();
sync(&mut doc1, &mut doc2, &mut s1, &mut s2);
assert_eq!(doc1.get_heads(), all_heads);
assert_eq!(doc2.get_heads(), all_heads);
}
#[test]
fn should_handle_chains_of_false_positives() {
//// Scenario: ,-- c5
//// c0 <-- c1 <-- c2 <-- c3 <-- c4 <-+
//// `-- n2c1 <-- n2c2 <-- n2c3
//// where n2c1 and n2c2 are both false positives in the Bloom filter containing {c5}.
//// lastSync is c4.
let mut doc1 = crate::AutoCommit::new().with_actor(ActorId::try_from("abc123").unwrap());
let mut doc2 = crate::AutoCommit::new().with_actor(ActorId::try_from("def456").unwrap());
let mut s1 = State::new();
let mut s2 = State::new();
for i in 0..10 {
doc1.put(crate::ROOT, "x", i).unwrap();
doc1.commit();
}
sync(&mut doc1, &mut doc2, &mut s1, &mut s2);
doc1.put(crate::ROOT, "x", 5).unwrap();
doc1.commit();
let bloom = BloomFilter::from_hashes(doc1.get_heads().into_iter());
// search for false positive; see comment above
let mut i = 0;
let mut doc2 = loop {
let mut doc = doc2
.fork()
.with_actor(ActorId::try_from("89abcdef").unwrap());
doc.put(crate::ROOT, "x", format!("{} at 89abdef", i))
.unwrap();
doc.commit();
if bloom.contains_hash(&doc.get_heads()[0]) {
break doc;
}
i += 1;
};
// find another false positive building on the first
i = 0;
let mut doc2 = loop {
let mut doc = doc2
.fork()
.with_actor(ActorId::try_from("89abcdef").unwrap());
doc.put(crate::ROOT, "x", format!("{} again", i)).unwrap();
doc.commit();
if bloom.contains_hash(&doc.get_heads()[0]) {
break doc;
}
i += 1;
};
doc2.put(crate::ROOT, "x", "final @ 89abcdef").unwrap();
let mut all_heads = doc1.get_heads();
all_heads.extend(doc2.get_heads());
all_heads.sort();
let (_, mut s1) = State::parse(Input::new(s1.encode().as_slice())).unwrap();
let (_, mut s2) = State::parse(Input::new(s2.encode().as_slice())).unwrap();
sync(&mut doc1, &mut doc2, &mut s1, &mut s2);
assert_eq!(doc1.get_heads(), all_heads);
assert_eq!(doc2.get_heads(), all_heads);
}
fn sync(
a: &mut crate::AutoCommit,
b: &mut crate::AutoCommit,
a_sync_state: &mut State,
b_sync_state: &mut State,
) {
//function sync(a: Automerge, b: Automerge, aSyncState = initSyncState(), bSyncState = initSyncState()) {
const MAX_ITER: usize = 10;
let mut iterations = 0;
loop {
let a_to_b = a.generate_sync_message(a_sync_state);
let b_to_a = b.generate_sync_message(b_sync_state);
if a_to_b.is_none() && b_to_a.is_none() {
break;
}
if iterations > MAX_ITER {
panic!("failed to sync in {} iterations", MAX_ITER);
}
if let Some(msg) = a_to_b {
b.receive_sync_message(b_sync_state, msg).unwrap()
}
if let Some(msg) = b_to_a {
a.receive_sync_message(a_sync_state, msg).unwrap()
}
iterations += 1;
}
}
}