92f3efd6e0
calls.
1024 lines
44 KiB
C
1024 lines
44 KiB
C
#include <setjmp.h>
|
|
#include <stdarg.h>
|
|
#include <stddef.h>
|
|
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
|
|
/* third-party */
|
|
#include <cmocka.h>
|
|
|
|
/* local */
|
|
#include "automerge.h"
|
|
|
|
typedef struct {
|
|
AMresult* doc1_result;
|
|
AMdoc* doc1;
|
|
AMresult* doc2_result;
|
|
AMdoc* doc2;
|
|
AMresult* sync_state1_result;
|
|
AMsyncState* sync_state1;
|
|
AMresult* sync_state2_result;
|
|
AMsyncState* sync_state2;
|
|
} TestState;
|
|
|
|
static int setup(void** state) {
|
|
TestState* test_state = calloc(1, sizeof(TestState));
|
|
test_state->doc1_result = AMcreate();
|
|
test_state->doc1 = AMresultValue(test_state->doc1_result).doc;
|
|
test_state->doc2_result = AMcreate();
|
|
test_state->doc2 = AMresultValue(test_state->doc2_result).doc;
|
|
test_state->sync_state1_result = AMsyncStateInit();
|
|
test_state->sync_state1 = AMresultValue(test_state->sync_state1_result).sync_state;
|
|
test_state->sync_state2_result = AMsyncStateInit();
|
|
test_state->sync_state2 = AMresultValue(test_state->sync_state2_result).sync_state;
|
|
*state = test_state;
|
|
return 0;
|
|
}
|
|
|
|
static int teardown(void** state) {
|
|
TestState* test_state = *state;
|
|
AMfree(test_state->doc1_result);
|
|
AMfree(test_state->doc2_result);
|
|
AMfree(test_state->sync_state1_result);
|
|
AMfree(test_state->sync_state2_result);
|
|
free(test_state);
|
|
return 0;
|
|
}
|
|
|
|
static void sync(AMdoc* a,
|
|
AMdoc* b,
|
|
AMsyncState* a_sync_state,
|
|
AMsyncState* b_sync_state) {
|
|
static size_t const MAX_ITER = 10;
|
|
|
|
AMsyncMessage const* a2b_msg = NULL;
|
|
AMsyncMessage const* b2a_msg = NULL;
|
|
size_t iter = 0;
|
|
do {
|
|
AMresult* a2b_msg_result = AMgenerateSyncMessage(a, a_sync_state);
|
|
AMresult* b2a_msg_result = AMgenerateSyncMessage(b, b_sync_state);
|
|
AMvalue value = AMresultValue(a2b_msg_result);
|
|
switch (value.tag) {
|
|
case AM_VALUE_SYNC_MESSAGE: {
|
|
a2b_msg = value.sync_message;
|
|
AMfree(AMreceiveSyncMessage(b, b_sync_state, a2b_msg));
|
|
}
|
|
break;
|
|
case AM_VALUE_VOID: a2b_msg = NULL; break;
|
|
}
|
|
value = AMresultValue(b2a_msg_result);
|
|
switch (value.tag) {
|
|
case AM_VALUE_SYNC_MESSAGE: {
|
|
b2a_msg = value.sync_message;
|
|
AMfree(AMreceiveSyncMessage(a, a_sync_state, b2a_msg));
|
|
}
|
|
break;
|
|
case AM_VALUE_VOID: b2a_msg = NULL; break;
|
|
}
|
|
if (++iter > MAX_ITER) {
|
|
fail_msg("Did not synchronize within %d iterations. "
|
|
"Do you have a bug causing an infinite loop?", MAX_ITER);
|
|
}
|
|
} while(a2b_msg || b2a_msg);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, an empty local doc
|
|
* should send a sync message implying no local data.
|
|
*/
|
|
static void test_converged_empty_local_doc_reply_no_local_data(void **state) {
|
|
TestState* test_state = *state;
|
|
AMresult* sync_message_result = AMgenerateSyncMessage(
|
|
test_state->doc1, test_state->sync_state1
|
|
);
|
|
if (AMresultStatus(sync_message_result) != AM_STATUS_OK) {
|
|
fail_msg("%s", AMerrorMessage(sync_message_result));
|
|
}
|
|
assert_int_equal(AMresultSize(sync_message_result), 1);
|
|
AMvalue value = AMresultValue(sync_message_result);
|
|
assert_int_equal(value.tag, AM_VALUE_SYNC_MESSAGE);
|
|
AMsyncMessage const* sync_message = value.sync_message;
|
|
AMchangeHashes heads = AMsyncMessageHeads(sync_message);
|
|
assert_int_equal(AMchangeHashesSize(&heads), 0);
|
|
AMchangeHashes needs = AMsyncMessageNeeds(sync_message);
|
|
assert_int_equal(AMchangeHashesSize(&needs), 0);
|
|
AMsyncHaves haves = AMsyncMessageHaves(sync_message);
|
|
assert_int_equal(AMsyncHavesSize(&haves), 1);
|
|
AMsyncHave const* have0 = AMsyncHavesNext(&haves, 1);
|
|
AMchangeHashes last_sync = AMsyncHaveLastSync(have0);
|
|
assert_int_equal(AMchangeHashesSize(&last_sync), 0);
|
|
AMchanges changes = AMsyncMessageChanges(sync_message);
|
|
assert_int_equal(AMchangesSize(&changes), 0);
|
|
AMfree(sync_message_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, an empty local doc
|
|
* should not reply if we have no data as well.
|
|
*/
|
|
static void test_converged_empty_local_doc_no_reply(void **state) {
|
|
TestState* test_state = *state;
|
|
AMresult* sync_message1_result = AMgenerateSyncMessage(
|
|
test_state->doc1, test_state->sync_state1
|
|
);
|
|
if (AMresultStatus(sync_message1_result) != AM_STATUS_OK) {
|
|
fail_msg("%s", AMerrorMessage(sync_message1_result));
|
|
}
|
|
assert_int_equal(AMresultSize(sync_message1_result), 1);
|
|
AMvalue value = AMresultValue(sync_message1_result);
|
|
assert_int_equal(value.tag, AM_VALUE_SYNC_MESSAGE);
|
|
AMsyncMessage const* sync_message1 = value.sync_message;
|
|
AMresult* result = AMreceiveSyncMessage(
|
|
test_state->doc2, test_state->sync_state2, sync_message1
|
|
);
|
|
if (AMresultStatus(result) != AM_STATUS_OK) {
|
|
fail_msg("%s", AMerrorMessage(result));
|
|
}
|
|
assert_int_equal(AMresultSize(result), 0);
|
|
value = AMresultValue(result);
|
|
assert_int_equal(value.tag, AM_VALUE_VOID);
|
|
AMfree(result);
|
|
AMresult* sync_message2_result = AMgenerateSyncMessage(
|
|
test_state->doc2, test_state->sync_state2
|
|
);
|
|
if (AMresultStatus(sync_message2_result) != AM_STATUS_OK) {
|
|
fail_msg("%s", AMerrorMessage(sync_message2_result));
|
|
}
|
|
assert_int_equal(AMresultSize(sync_message2_result), 0);
|
|
value = AMresultValue(sync_message2_result);
|
|
assert_int_equal(value.tag, AM_VALUE_VOID);
|
|
AMfree(sync_message2_result);
|
|
AMfree(sync_message1_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* repos with equal heads do not need a reply message.
|
|
*/
|
|
static void test_converged_equal_heads_no_reply(void **state) {
|
|
TestState* test_state = *state;
|
|
|
|
/* Make two nodes with the same changes. */
|
|
time_t const time = 0;
|
|
for (size_t index = 0; index != 10; ++index) {
|
|
AMfree(AMlistPutUint(test_state->doc1, AM_ROOT, index, true, index));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
AMresult* changes_result = AMgetChanges(test_state->doc1, NULL);
|
|
AMvalue value = AMresultValue(changes_result);
|
|
AMfree(AMapplyChanges(test_state->doc2, &value.changes));
|
|
AMfree(changes_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
|
|
/* Generate a naive sync message. */
|
|
AMresult* sync_message1_result = AMgenerateSyncMessage(
|
|
test_state->doc1,
|
|
test_state->sync_state1
|
|
);
|
|
AMsyncMessage const* sync_message1 = AMresultValue(sync_message1_result).sync_message;
|
|
AMchangeHashes last_sent_heads = AMsyncStateLastSentHeads(test_state->sync_state1);
|
|
AMresult* heads_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads = AMresultValue(heads_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&last_sent_heads, &heads), 0);
|
|
AMfree(heads_result);
|
|
|
|
/* Heads are equal so this message should be void. */
|
|
AMfree(AMreceiveSyncMessage(
|
|
test_state->doc2, test_state->sync_state2, sync_message1
|
|
));
|
|
AMfree(sync_message1_result);
|
|
AMresult* sync_message2_result = AMgenerateSyncMessage(
|
|
test_state->doc2, test_state->sync_state2
|
|
);
|
|
assert_int_equal(AMresultValue(sync_message2_result).tag, AM_VALUE_VOID);
|
|
AMfree(sync_message2_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* the first node should offer all changes to the second node when
|
|
* starting from nothing.
|
|
*/
|
|
static void test_converged_offer_all_changes_from_nothing(void **state) {
|
|
TestState* test_state = *state;
|
|
|
|
/* Make changes for the first node that the second node should request. */
|
|
time_t const time = 0;
|
|
for (size_t index = 0; index != 10; ++index) {
|
|
AMfree(AMlistPutUint(test_state->doc1, AM_ROOT, index, true, index));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should sync peers where one has commits the other does not.
|
|
*/
|
|
static void test_converged_sync_peers_with_uneven_commits(void **state) {
|
|
TestState* test_state = *state;
|
|
|
|
/* Make changes for the first node that the second node should request. */
|
|
time_t const time = 0;
|
|
for (size_t index = 0; index != 10; ++index) {
|
|
AMfree(AMlistPutUint(test_state->doc1, AM_ROOT, index, true, index));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should work with prior sync state.
|
|
*/
|
|
static void test_converged_works_with_prior_sync_state(void **state) {
|
|
/* Create & synchronize two nodes. */
|
|
TestState* test_state = *state;
|
|
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 5; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
/* Modify the first node further. */
|
|
for (size_t value = 5; value != 10; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should not generate messages once synced.
|
|
*/
|
|
static void test_converged_no_message_once_synced(void **state) {
|
|
/* Create & synchronize two nodes. */
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "abc123"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "def456"));
|
|
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 5; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "y", value));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
}
|
|
|
|
/* The first node reports what it has. */
|
|
AMresult* message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
AMsyncMessage const* message = AMresultValue(message_result).sync_message;
|
|
|
|
/* The second node receives that message and sends changes along with what
|
|
* it has. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc2,
|
|
test_state->sync_state2,
|
|
message));
|
|
AMfree(message_result);
|
|
message_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
message = AMresultValue(message_result).sync_message;
|
|
AMchanges message_changes = AMsyncMessageChanges(message);
|
|
assert_int_equal(AMchangesSize(&message_changes), 5);
|
|
|
|
/* The first node receives the changes and replies with the changes it now
|
|
* knows that the second node needs. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc1,
|
|
test_state->sync_state1,
|
|
message));
|
|
AMfree(message_result);
|
|
message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
message = AMresultValue(message_result).sync_message;
|
|
message_changes = AMsyncMessageChanges(message);
|
|
assert_int_equal(AMchangesSize(&message_changes), 5);
|
|
|
|
/* The second node applies the changes and sends confirmation ending the
|
|
* exchange. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc2,
|
|
test_state->sync_state2,
|
|
message));
|
|
AMfree(message_result);
|
|
message_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
message = AMresultValue(message_result).sync_message;
|
|
|
|
/* The first node receives the message and has nothing more to say. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc1,
|
|
test_state->sync_state1,
|
|
message));
|
|
AMfree(message_result);
|
|
message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
assert_int_equal(AMresultValue(message_result).tag, AM_VALUE_VOID);
|
|
AMfree(message_result);
|
|
|
|
/* The second node also has nothing left to say. */
|
|
message_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
assert_int_equal(AMresultValue(message_result).tag, AM_VALUE_VOID);
|
|
AMfree(message_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should allow simultaneous messages during synchronization.
|
|
*/
|
|
static void test_converged_allow_simultaneous_messages(void **state) {
|
|
/* Create & synchronize two nodes. */
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "abc123"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "def456"));
|
|
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 5; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "y", value));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
}
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMbyteSpan head1 = AMchangeHashesNext(&heads1, 1);
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
AMbyteSpan head2 = AMchangeHashesNext(&heads2, 1);
|
|
|
|
/* Both sides report what they have but have no shared peer state. */
|
|
AMresult* msg1to2_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
AMsyncMessage const* msg1to2 = AMresultValue(msg1to2_result).sync_message;
|
|
AMresult* msg2to1_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
AMsyncMessage const* msg2to1 = AMresultValue(msg2to1_result).sync_message;
|
|
AMchanges msg1to2_changes = AMsyncMessageChanges(msg1to2);
|
|
assert_int_equal(AMchangesSize(&msg1to2_changes), 0);
|
|
AMsyncHaves msg1to2_haves = AMsyncMessageHaves(msg1to2);
|
|
AMsyncHave const* msg1to2_have = AMsyncHavesNext(&msg1to2_haves, 1);
|
|
AMchangeHashes msg1to2_last_sync = AMsyncHaveLastSync(msg1to2_have);
|
|
assert_int_equal(AMchangeHashesSize(&msg1to2_last_sync), 0);
|
|
AMchanges msg2to1_changes = AMsyncMessageChanges(msg2to1);
|
|
assert_int_equal(AMchangesSize(&msg2to1_changes), 0);
|
|
AMsyncHaves msg2to1_haves = AMsyncMessageHaves(msg2to1);
|
|
AMsyncHave const* msg2to1_have = AMsyncHavesNext(&msg2to1_haves, 1);
|
|
AMchangeHashes msg2to1_last_sync = AMsyncHaveLastSync(msg2to1_have);
|
|
assert_int_equal(AMchangeHashesSize(&msg2to1_last_sync), 0);
|
|
|
|
/* Both nodes receive messages from each other and update their
|
|
* synchronization states. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc1,
|
|
test_state->sync_state1,
|
|
msg2to1));
|
|
AMfree(msg2to1_result);
|
|
AMfree(AMreceiveSyncMessage(test_state->doc2,
|
|
test_state->sync_state2,
|
|
msg1to2));
|
|
AMfree(msg1to2_result);
|
|
|
|
/* Now both reply with their local changes that the other lacks
|
|
* (standard warning that 1% of the time this will result in a "needs"
|
|
* message). */
|
|
msg1to2_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
msg1to2 = AMresultValue(msg1to2_result).sync_message;
|
|
msg1to2_changes = AMsyncMessageChanges(msg1to2);
|
|
assert_int_equal(AMchangesSize(&msg1to2_changes), 5);
|
|
msg2to1_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
msg2to1 = AMresultValue(msg2to1_result).sync_message;
|
|
msg2to1_changes = AMsyncMessageChanges(msg2to1);
|
|
assert_int_equal(AMchangesSize(&msg2to1_changes), 5);
|
|
|
|
/* Both should now apply the changes. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc1,
|
|
test_state->sync_state1,
|
|
msg2to1));
|
|
AMfree(msg2to1_result);
|
|
AMresult* missing_deps_result = AMgetMissingDeps(test_state->doc1, NULL);
|
|
AMchangeHashes missing_deps = AMresultValue(missing_deps_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesSize(&missing_deps), 0);
|
|
AMfree(missing_deps_result);
|
|
AMresult* map_value_result = AMmapGet(test_state->doc1, AM_ROOT, "x");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 4);
|
|
AMfree(map_value_result);
|
|
map_value_result = AMmapGet(test_state->doc1, AM_ROOT, "y");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 4);
|
|
AMfree(map_value_result);
|
|
|
|
AMfree(AMreceiveSyncMessage(test_state->doc2,
|
|
test_state->sync_state2,
|
|
msg1to2));
|
|
AMfree(msg1to2_result);
|
|
missing_deps_result = AMgetMissingDeps(test_state->doc2, NULL);
|
|
missing_deps = AMresultValue(missing_deps_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesSize(&missing_deps), 0);
|
|
AMfree(missing_deps_result);
|
|
map_value_result = AMmapGet(test_state->doc2, AM_ROOT, "x");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 4);
|
|
AMfree(map_value_result);
|
|
map_value_result = AMmapGet(test_state->doc2, AM_ROOT, "y");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 4);
|
|
AMfree(map_value_result);
|
|
|
|
/* The response acknowledges that the changes were received and sends no
|
|
* further changes. */
|
|
msg1to2_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
msg1to2 = AMresultValue(msg1to2_result).sync_message;
|
|
msg1to2_changes = AMsyncMessageChanges(msg1to2);
|
|
assert_int_equal(AMchangesSize(&msg1to2_changes), 0);
|
|
msg2to1_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
msg2to1 = AMresultValue(msg2to1_result).sync_message;
|
|
msg2to1_changes = AMsyncMessageChanges(msg2to1);
|
|
assert_int_equal(AMchangesSize(&msg2to1_changes), 0);
|
|
|
|
/* After receiving acknowledgements their shared heads should be equal. */
|
|
AMfree(AMreceiveSyncMessage(test_state->doc1,
|
|
test_state->sync_state1,
|
|
msg2to1));
|
|
AMfree(msg2to1_result);
|
|
AMfree(AMreceiveSyncMessage(test_state->doc2,
|
|
test_state->sync_state2,
|
|
msg1to2));
|
|
AMfree(msg1to2_result);
|
|
|
|
/* They're synchronized so no more messages are required. */
|
|
msg1to2_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
assert_int_equal(AMresultValue(msg1to2_result).tag, AM_VALUE_VOID);
|
|
AMfree(msg1to2_result);
|
|
msg2to1_result = AMgenerateSyncMessage(test_state->doc2,
|
|
test_state->sync_state2);
|
|
assert_int_equal(AMresultValue(msg2to1_result).tag, AM_VALUE_VOID);
|
|
AMfree(msg2to1_result);
|
|
|
|
/* If we make one more change and start synchronizing then its "last
|
|
* sync" property should be updated. */
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", 5));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
msg1to2_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
msg1to2 = AMresultValue(msg1to2_result).sync_message;
|
|
msg1to2_haves = AMsyncMessageHaves(msg1to2);
|
|
msg1to2_have = AMsyncHavesNext(&msg1to2_haves, 1);
|
|
msg1to2_last_sync = AMsyncHaveLastSync(msg1to2_have);
|
|
AMbyteSpan msg1to2_last_sync_next = AMchangeHashesNext(&msg1to2_last_sync, 1);
|
|
assert_int_equal(msg1to2_last_sync_next.count, head1.count);
|
|
assert_memory_equal(msg1to2_last_sync_next.src, head1.src, head1.count);
|
|
msg1to2_last_sync_next = AMchangeHashesNext(&msg1to2_last_sync, 1);
|
|
assert_int_equal(msg1to2_last_sync_next.count, head2.count);
|
|
assert_memory_equal(msg1to2_last_sync_next.src, head2.src, head2.count);
|
|
AMfree(heads1_result);
|
|
AMfree(heads2_result);
|
|
AMfree(msg1to2_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should assume sent changes were received until we hear otherwise.
|
|
*/
|
|
static void test_converged_assume_sent_changes_were_received(void **state) {
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
|
|
AMresult* items_result = AMmapPutObject(test_state->doc1,
|
|
AM_ROOT,
|
|
"items",
|
|
AM_OBJ_TYPE_LIST);
|
|
AMobjId const* items = AMresultValue(items_result).obj_id;
|
|
time_t const time = 0;
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
AMfree(AMlistPutStr(test_state->doc1, items, 0, true, "x"));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMresult* message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
AMsyncMessage const* message = AMresultValue(message_result).sync_message;
|
|
AMchanges message_changes = AMsyncMessageChanges(message);
|
|
assert_int_equal(AMchangesSize(&message_changes), 1);
|
|
AMfree(message_result);
|
|
|
|
AMfree(AMlistPutStr(test_state->doc1, items, 1, true, "y"));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
message = AMresultValue(message_result).sync_message;
|
|
message_changes = AMsyncMessageChanges(message);
|
|
assert_int_equal(AMchangesSize(&message_changes), 1);
|
|
AMfree(message_result);
|
|
|
|
AMfree(AMlistPutStr(test_state->doc1, items, 2, true, "z"));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
message_result = AMgenerateSyncMessage(test_state->doc1,
|
|
test_state->sync_state1);
|
|
message = AMresultValue(message_result).sync_message;
|
|
message_changes = AMsyncMessageChanges(message);
|
|
assert_int_equal(AMchangesSize(&message_changes), 1);
|
|
AMfree(message_result);
|
|
|
|
AMfree(items_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with docs already in sync, documents with data and
|
|
* it should work regardless of who initiates the exchange.
|
|
*/
|
|
static void test_converged_works_regardless_of_who_initiates(void **state) {
|
|
/* Create & synchronize two nodes. */
|
|
TestState* test_state = *state;
|
|
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 5; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
/* Modify the first node further. */
|
|
for (size_t value = 5; value != 10; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should work without
|
|
* prior sync state.
|
|
*/
|
|
static void test_diverged_works_without_prior_sync_state(void **state) {
|
|
/* Scenario:
|
|
* ,-- c10 <-- c11 <-- c12 <-- c13 <-- c14
|
|
* c0 <-- c1 <-- c2 <-- c3 <-- c4 <-- c5 <-- c6 <-- c7 <-- c8 <-- c9 <-+
|
|
* `-- c15 <-- c16 <-- c17
|
|
* lastSync is undefined. */
|
|
|
|
/* Create two peers both with divergent commits. */
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 10; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
for (size_t value = 10; value != 15; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
for (size_t value = 15; value != 18; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
}
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should work with
|
|
* prior sync state.
|
|
*/
|
|
static void test_diverged_works_with_prior_sync_state(void **state) {
|
|
/* Scenario:
|
|
* ,-- c10 <-- c11 <-- c12 <-- c13 <-- c14
|
|
* c0 <-- c1 <-- c2 <-- c3 <-- c4 <-- c5 <-- c6 <-- c7 <-- c8 <-- c9 <-+
|
|
* `-- c15 <-- c16 <-- c17
|
|
* lastSync is c9. */
|
|
|
|
/* Create two peers both with divergent commits. */
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 10; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
for (size_t value = 10; value != 15; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
for (size_t value = 15; value != 18; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
}
|
|
AMresult* encoded_result = AMsyncStateEncode(test_state->sync_state1);
|
|
AMbyteSpan encoded = AMresultValue(encoded_result).bytes;
|
|
AMresult* sync_state1_result = AMsyncStateDecode(encoded.src, encoded.count);
|
|
AMfree(encoded_result);
|
|
AMsyncState* sync_state1 = AMresultValue(sync_state1_result).sync_state;
|
|
encoded_result = AMsyncStateEncode(test_state->sync_state2);
|
|
encoded = AMresultValue(encoded_result).bytes;
|
|
AMresult* sync_state2_result = AMsyncStateDecode(encoded.src, encoded.count);
|
|
AMfree(encoded_result);
|
|
AMsyncState* sync_state2 = AMresultValue(sync_state2_result).sync_state;
|
|
|
|
assert_false(AMequal(test_state->doc1, test_state->doc2));
|
|
sync(test_state->doc1, test_state->doc2, sync_state1, sync_state2);
|
|
AMfree(sync_state2_result);
|
|
AMfree(sync_state1_result);
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should ensure
|
|
* non-empty state after synchronization.
|
|
*/
|
|
static void test_diverged_ensure_not_empty_after_sync(void **state) {
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 3; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMchangeHashes shared_heads1 = AMsyncStateSharedHeads(test_state->sync_state1);
|
|
assert_int_equal(AMchangeHashesCmp(&shared_heads1, &heads1), 0);
|
|
AMchangeHashes shared_heads2 = AMsyncStateSharedHeads(test_state->sync_state2);
|
|
assert_int_equal(AMchangeHashesCmp(&shared_heads2, &heads1), 0);
|
|
AMfree(heads1_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should
|
|
* re-synchronize after one node crashed with data loss.
|
|
*/
|
|
static void test_diverged_resync_after_node_crash_with_data_loss(void **state) {
|
|
/* Scenario:
|
|
* (r) (n2) (n1)
|
|
* c0 <-- c1 <-- c2 <-- c3 <-- c4 <-- c5 <-- c6 <-- c7 <-- c8
|
|
* n2 has changes {c0, c1, c2}, n1's lastSync is c5, and n2's lastSync
|
|
* is c2.
|
|
* We want to successfully sync (n1) with (r), even though (n1) believes
|
|
* it's talking to (n2). */
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
|
|
/* n1 makes three changes which we synchronize to n2. */
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 3; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
/* Save a copy of n2 as "r" to simulate recovering from a crash. */
|
|
AMresult* r_result = AMdup(test_state->doc2);
|
|
AMdoc* r = AMresultValue(r_result).doc;
|
|
AMresult* encoded_result = AMsyncStateEncode(test_state->sync_state2);
|
|
AMbyteSpan encoded = AMresultValue(encoded_result).bytes;
|
|
AMresult* sync_state_resultr = AMsyncStateDecode(encoded.src, encoded.count);
|
|
AMfree(encoded_result);
|
|
AMsyncState* sync_stater = AMresultValue(sync_state_resultr).sync_state;
|
|
/* Synchronize another few commits. */
|
|
for (size_t value = 3; value != 6; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
/* Everyone should be on the same page here. */
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
|
|
/* Now make a few more changes and then attempt to synchronize the
|
|
* fully-up-to-date n1 with with the confused r. */
|
|
for (size_t value = 6; value != 9; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
heads1_result = AMgetHeads(test_state->doc1);
|
|
heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads_resultr = AMgetHeads(r);
|
|
AMchangeHashes headsr = AMresultValue(heads_resultr).change_hashes;
|
|
assert_int_not_equal(AMchangeHashesCmp(&heads1, &headsr), 0);
|
|
AMfree(heads_resultr);
|
|
AMfree(heads1_result);
|
|
assert_false(AMequal(test_state->doc1, r));
|
|
AMresult* map_value_result = AMmapGet(test_state->doc1, AM_ROOT, "x");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 8);
|
|
AMfree(map_value_result);
|
|
map_value_result = AMmapGet(r, AM_ROOT, "x");
|
|
assert_int_equal(AMresultValue(map_value_result).uint, 2);
|
|
AMfree(map_value_result);
|
|
sync(test_state->doc1,
|
|
r,
|
|
test_state->sync_state1,
|
|
sync_stater);
|
|
AMfree(sync_state_resultr);
|
|
heads1_result = AMgetHeads(test_state->doc1);
|
|
heads1 = AMresultValue(heads1_result).change_hashes;
|
|
heads_resultr = AMgetHeads(r);
|
|
headsr = AMresultValue(heads_resultr).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &headsr), 0);
|
|
AMfree(heads_resultr);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, r));
|
|
AMfree(r_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should resync after
|
|
* one node experiences data loss without disconnecting.
|
|
*/
|
|
static void test_diverged_resync_after_data_loss_without_disconnection(void **state) {
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
|
|
/* n1 makes three changes which we synchronize to n2. */
|
|
time_t const time = 0;
|
|
for (size_t value = 0; value != 3; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
}
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
|
|
AMresult* doc2_after_data_loss_result = AMcreate();
|
|
AMdoc* doc2_after_data_loss = AMresultValue(doc2_after_data_loss_result).doc;
|
|
AMfree(AMsetActorHex(doc2_after_data_loss, "89abcdef"));
|
|
|
|
/* "n2" now has no data, but n1 still thinks it does. Note we don't do
|
|
* decodeSyncState(encodeSyncState(s1)) in order to simulate data loss
|
|
* without disconnecting. */
|
|
AMresult* sync_state2_after_data_loss_result = AMsyncStateInit();
|
|
AMsyncState* sync_state2_after_data_loss = AMresultValue(sync_state2_after_data_loss_result).sync_state;
|
|
sync(test_state->doc1,
|
|
doc2_after_data_loss,
|
|
test_state->sync_state1,
|
|
sync_state2_after_data_loss);
|
|
heads1_result = AMgetHeads(test_state->doc1);
|
|
heads1 = AMresultValue(heads1_result).change_hashes;
|
|
heads2_result = AMgetHeads(doc2_after_data_loss);
|
|
heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, doc2_after_data_loss));
|
|
AMfree(sync_state2_after_data_loss_result);
|
|
AMfree(doc2_after_data_loss_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should handle
|
|
* changes concurrent to the last sync heads.
|
|
*/
|
|
static void test_diverged_handles_concurrent_changes(void **state) {
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
AMresult* doc3_result = AMcreate();
|
|
AMdoc* doc3 = AMresultValue(doc3_result).doc;
|
|
AMfree(AMsetActorHex(doc3, "fedcba98"));
|
|
AMsyncState* sync_state12 = test_state->sync_state1;
|
|
AMsyncState* sync_state21 = test_state->sync_state2;
|
|
AMresult* sync_state23_result = AMsyncStateInit();
|
|
AMsyncState* sync_state23 = AMresultValue(sync_state23_result).sync_state;
|
|
AMresult* sync_state32_result = AMsyncStateInit();
|
|
AMsyncState* sync_state32 = AMresultValue(sync_state32_result).sync_state;
|
|
|
|
/* Change 1 is known to all three nodes. */
|
|
time_t const time = 0;
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", 1));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
sync(test_state->doc1, test_state->doc2, sync_state12, sync_state21);
|
|
sync(test_state->doc2, doc3, sync_state23, sync_state32);
|
|
|
|
/* Change 2 is known to n1 and n2. */
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", 2));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
sync(test_state->doc1, test_state->doc2, sync_state12, sync_state21);
|
|
|
|
/* Each of the three nodes makes one change (changes 3, 4, 5). */
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", 3));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "x", 4));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
AMfree(AMmapPutUint(doc3, AM_ROOT, "x", 5));
|
|
AMcommit(doc3, NULL, &time);
|
|
|
|
/* Apply n3's latest change to n2. */
|
|
AMresult* changes_result = AMgetLastLocalChange(doc3);
|
|
AMchanges changes = AMresultValue(changes_result).changes;
|
|
AMfree(AMapplyChanges(test_state->doc2, &changes));
|
|
AMfree(changes_result);
|
|
|
|
/* Now sync n1 and n2. n3's change is concurrent to n1 and n2's last sync
|
|
* heads. */
|
|
sync(test_state->doc1, test_state->doc2, sync_state12, sync_state21);
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
|
|
AMfree(sync_state32_result);
|
|
AMfree(sync_state23_result);
|
|
AMfree(doc3_result);
|
|
}
|
|
|
|
/**
|
|
* \brief Data sync protocol with diverged documents and it should handle
|
|
* histories with lots of branching and merging.
|
|
*/
|
|
static void test_diverged_handles_histories_of_branching_and_merging(void **state) {
|
|
TestState* test_state = *state;
|
|
AMfree(AMsetActorHex(test_state->doc1, "01234567"));
|
|
AMfree(AMsetActorHex(test_state->doc2, "89abcdef"));
|
|
AMresult* doc3_result = AMcreate();
|
|
AMdoc* doc3 = AMresultValue(doc3_result).doc;
|
|
AMfree(AMsetActorHex(doc3, "fedcba98"));
|
|
time_t const time = 0;
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "x", 0));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMresult* changes_result = AMgetLastLocalChange(test_state->doc1);
|
|
AMchanges changes = AMresultValue(changes_result).changes;
|
|
AMfree(AMapplyChanges(test_state->doc2, &changes));
|
|
AMfree(AMapplyChanges(doc3, &changes));
|
|
AMfree(changes_result);
|
|
AMfree(AMmapPutUint(doc3, AM_ROOT, "x", 1));
|
|
AMcommit(doc3, NULL, &time);
|
|
|
|
/* - n1c1 <------ n1c2 <------ n1c3 <-- etc. <-- n1c20 <------ n1c21
|
|
* / \/ \/ \/
|
|
* / /\ /\ /\
|
|
* c0 <---- n2c1 <------ n2c2 <------ n2c3 <-- etc. <-- n2c20 <------ n2c21
|
|
* \ /
|
|
* ---------------------------------------------- n3c1 <-----
|
|
*/
|
|
for (size_t value = 1; value != 20; ++value) {
|
|
AMfree(AMmapPutUint(test_state->doc1, AM_ROOT, "n1", value));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMfree(AMmapPutUint(test_state->doc2, AM_ROOT, "n2", value));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
AMresult* changes1_result = AMgetLastLocalChange(test_state->doc1);
|
|
AMchanges changes1 = AMresultValue(changes1_result).changes;
|
|
AMresult* changes2_result = AMgetLastLocalChange(test_state->doc2);
|
|
AMchanges changes2 = AMresultValue(changes2_result).changes;
|
|
AMfree(AMapplyChanges(test_state->doc1, &changes2));
|
|
AMfree(changes2_result);
|
|
AMfree(AMapplyChanges(test_state->doc2, &changes1));
|
|
AMfree(changes1_result);
|
|
}
|
|
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
|
|
/* Having n3's last change concurrent to the last sync heads forces us into
|
|
* the slower code path. */
|
|
AMresult* changes3_result = AMgetLastLocalChange(doc3);
|
|
AMchanges changes3 = AMresultValue(changes3_result).changes;
|
|
AMfree(AMapplyChanges(test_state->doc2, &changes3));
|
|
AMfree(changes3_result);
|
|
AMfree(AMmapPutStr(test_state->doc1, AM_ROOT, "n1", "final"));
|
|
AMcommit(test_state->doc1, NULL, &time);
|
|
AMfree(AMmapPutStr(test_state->doc2, AM_ROOT, "n2", "final"));
|
|
AMcommit(test_state->doc2, NULL, &time);
|
|
|
|
sync(test_state->doc1,
|
|
test_state->doc2,
|
|
test_state->sync_state1,
|
|
test_state->sync_state2);
|
|
AMresult* heads1_result = AMgetHeads(test_state->doc1);
|
|
AMchangeHashes heads1 = AMresultValue(heads1_result).change_hashes;
|
|
AMresult* heads2_result = AMgetHeads(test_state->doc2);
|
|
AMchangeHashes heads2 = AMresultValue(heads2_result).change_hashes;
|
|
assert_int_equal(AMchangeHashesCmp(&heads1, &heads2), 0);
|
|
AMfree(heads2_result);
|
|
AMfree(heads1_result);
|
|
assert_true(AMequal(test_state->doc1, test_state->doc2));
|
|
|
|
AMfree(doc3_result);
|
|
}
|
|
|
|
int run_sync_tests(void) {
|
|
const struct CMUnitTest tests[] = {
|
|
cmocka_unit_test_setup_teardown(test_converged_empty_local_doc_reply_no_local_data, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_empty_local_doc_no_reply, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_equal_heads_no_reply, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_offer_all_changes_from_nothing, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_sync_peers_with_uneven_commits, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_works_with_prior_sync_state, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_no_message_once_synced, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_allow_simultaneous_messages, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_assume_sent_changes_were_received, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_converged_works_regardless_of_who_initiates, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_works_without_prior_sync_state, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_works_with_prior_sync_state, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_ensure_not_empty_after_sync, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_resync_after_node_crash_with_data_loss, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_resync_after_data_loss_without_disconnection, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_handles_concurrent_changes, setup, teardown),
|
|
cmocka_unit_test_setup_teardown(test_diverged_handles_histories_of_branching_and_merging, setup, teardown),
|
|
};
|
|
|
|
return cmocka_run_group_tests(tests, NULL, NULL);
|
|
}
|