Compare commits
2 commits
Author | SHA1 | Date | |
---|---|---|---|
|
0975c5614b | ||
|
b7415d18eb |
20 changed files with 514 additions and 0 deletions
|
@ -6,6 +6,7 @@ members = [
|
|||
"automerge-test",
|
||||
"automerge-wasm",
|
||||
"edit-trace",
|
||||
"badmessage",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
|
|
2
rust/automerge-wasm/examples/redis-sync/.eslintignore
Normal file
2
rust/automerge-wasm/examples/redis-sync/.eslintignore
Normal file
|
@ -0,0 +1,2 @@
|
|||
dist
|
||||
examples
|
11
rust/automerge-wasm/examples/redis-sync/.eslintrc.cjs
Normal file
11
rust/automerge-wasm/examples/redis-sync/.eslintrc.cjs
Normal file
|
@ -0,0 +1,11 @@
|
|||
module.exports = {
|
||||
root: true,
|
||||
parser: '@typescript-eslint/parser',
|
||||
plugins: [
|
||||
'@typescript-eslint',
|
||||
],
|
||||
extends: [
|
||||
'eslint:recommended',
|
||||
'plugin:@typescript-eslint/recommended',
|
||||
],
|
||||
};
|
2
rust/automerge-wasm/examples/redis-sync/.gitignore
vendored
Normal file
2
rust/automerge-wasm/examples/redis-sync/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
/dist
|
||||
node_modules
|
36
rust/automerge-wasm/examples/redis-sync/README.md
Normal file
36
rust/automerge-wasm/examples/redis-sync/README.md
Normal file
|
@ -0,0 +1,36 @@
|
|||
|
||||
## Quick Example of Syncing Automerge Docs via Redis
|
||||
|
||||
The good:
|
||||
1. store the document in redis as both a compacted document save and as individual changes
|
||||
2. able to load the the saved document, and then calculate which changes are missing in a single pass
|
||||
3. uses redis (RPUSH) to get global change ordering
|
||||
4. wake all clients on new changes with redis (PUBLISH, SUBSCRIBE)
|
||||
5. elect (via SETNX) a single leader to saveIncremental() every 10 seconds
|
||||
6. said node also does a full doc save when incremental bytes are 10x the base save
|
||||
7. detects disconnection of the leader (via EXPIRE) and elects a new leader
|
||||
8. detects when a document has not been created and elects a single node to create it
|
||||
|
||||
The bad:
|
||||
1. redis can do binary data - i couldnt figure it out - i hex encode/decode everything
|
||||
2. there's probably a few race conditions i need to find
|
||||
3. my typescript is ass
|
||||
4. not much in the way of error handling
|
||||
5. should really be using applyPatches() but im not
|
||||
6. the api is a little funny
|
||||
|
||||
How to use:
|
||||
|
||||
```
|
||||
$ yarn
|
||||
$ yarn build
|
||||
```
|
||||
|
||||
Then run
|
||||
|
||||
```
|
||||
$ node client.js $DOCID
|
||||
```
|
||||
|
||||
In two or three different windows and watch each client editing the same document.
|
||||
|
43
rust/automerge-wasm/examples/redis-sync/client.js
Normal file
43
rust/automerge-wasm/examples/redis-sync/client.js
Normal file
|
@ -0,0 +1,43 @@
|
|||
let { RedisSync } = require('.')
|
||||
let clientId = `C${ rand(1000) }`
|
||||
let docId = process.argv[2]
|
||||
|
||||
if (typeof docId !== 'string' || docId.length < 2 || docId.length > 20) {
|
||||
console.log("usage: node client.js DOCID")
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
console.log("clientID is", clientId)
|
||||
console.log("docID is", docId)
|
||||
|
||||
let init = (doc) => {
|
||||
doc.put("/","title", "token tracker")
|
||||
doc.putObject("/","tokens", [])
|
||||
}
|
||||
|
||||
let sync = new RedisSync({ redis: "redis://", docId, clientId, init, update });
|
||||
sync.connect()
|
||||
|
||||
function rand(max) {
|
||||
return Math.floor(Math.random() * max)
|
||||
}
|
||||
|
||||
function update() {
|
||||
console.log("DOC STATE", sync.toJS())
|
||||
}
|
||||
|
||||
function tweak() {
|
||||
if (rand(3) == 0) {
|
||||
sync.change((doc) => {
|
||||
let len = doc.length("/tokens")
|
||||
if (len + rand(10) > 20) {
|
||||
doc.delete("/tokens", rand(len))
|
||||
} else {
|
||||
doc.insert("/tokens", rand(len), rand(255))
|
||||
}
|
||||
doc.put("/", "winner", clientId)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
setInterval(tweak, 3000)
|
6
rust/automerge-wasm/examples/redis-sync/config/cjs.json
Normal file
6
rust/automerge-wasm/examples/redis-sync/config/cjs.json
Normal file
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"extends": "../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "../dist/cjs"
|
||||
}
|
||||
}
|
8
rust/automerge-wasm/examples/redis-sync/config/mjs.json
Normal file
8
rust/automerge-wasm/examples/redis-sync/config/mjs.json
Normal file
|
@ -0,0 +1,8 @@
|
|||
{
|
||||
"extends": "../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"target": "es2017",
|
||||
"module": "esnext",
|
||||
"outDir": "../dist/mjs"
|
||||
}
|
||||
}
|
33
rust/automerge-wasm/examples/redis-sync/package.json
Normal file
33
rust/automerge-wasm/examples/redis-sync/package.json
Normal file
|
@ -0,0 +1,33 @@
|
|||
{
|
||||
"name": "redis-sync",
|
||||
"version": "0.0.1",
|
||||
"description": "",
|
||||
"types": "./dist/index.d.ts",
|
||||
"module": "./dist/mjs/index.js",
|
||||
"main": "./dist/cjs/index.js",
|
||||
"scripts": {
|
||||
"lint": "eslint src",
|
||||
"build": "tsc -p config/mjs.json && tsc -p config/cjs.json && tsc --emitDeclarationOnly",
|
||||
"test": "ts-mocha test/*.ts",
|
||||
"watch-docs": "typedoc src/index.ts --watch --readme typedoc-readme.md"
|
||||
},
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@automerge/automerge-wasm": "^0.1.19",
|
||||
"redis": "^4.5.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/expect": "^24.3.0",
|
||||
"@types/mocha": "^9.1.1",
|
||||
"@types/uuid": "^8.3.4",
|
||||
"@typescript-eslint/eslint-plugin": "^5.25.0",
|
||||
"@typescript-eslint/parser": "^5.25.0",
|
||||
"eslint": "^8.15.0",
|
||||
"mocha": "^10.0.0",
|
||||
"ts-mocha": "^10.0.0",
|
||||
"ts-node": "^10.9.1",
|
||||
"typedoc": "^0.23.16",
|
||||
"typescript": "^4.6.4"
|
||||
}
|
||||
}
|
210
rust/automerge-wasm/examples/redis-sync/src/index.ts
Normal file
210
rust/automerge-wasm/examples/redis-sync/src/index.ts
Normal file
|
@ -0,0 +1,210 @@
|
|||
import { createClient } from 'redis';
|
||||
import type { RedisClientType, RedisClientOptions } from 'redis'
|
||||
import { Automerge, create, load } from "@automerge/automerge-wasm"
|
||||
|
||||
export type RedisSyncOptions<T> = {
|
||||
redis: string,
|
||||
docId: string,
|
||||
clientId: string,
|
||||
init: (doc: Automerge) => void,
|
||||
update?: (doc: any) => void,
|
||||
}
|
||||
|
||||
export class RedisSync<T> {
|
||||
client: RedisClientType
|
||||
subscriber: RedisClientType
|
||||
doc: null | Automerge
|
||||
docId: string
|
||||
clientId: string
|
||||
cursor: number
|
||||
leader: string | null
|
||||
init: (doc: Automerge) => void
|
||||
update?: (doc: any) => void
|
||||
interval: ReturnType<typeof setInterval>
|
||||
save_bytes: number
|
||||
incremental_save_bytes: number
|
||||
|
||||
constructor(options: RedisSyncOptions<T>) {
|
||||
this.doc = null
|
||||
this.docId = options.docId
|
||||
this.client = createClient({ url: options.redis })
|
||||
this.clientId = options.clientId
|
||||
this.cursor = 0
|
||||
this.init = options.init
|
||||
this.update = options.update
|
||||
this.interval = setInterval(() => this.tick(), 10 * 1000) // force a save every 10 seconds
|
||||
this.leader = null
|
||||
this.save_bytes = 0
|
||||
this.incremental_save_bytes = 0
|
||||
|
||||
this.client.on('error', (err) => console.log('Redis Client Error', err));
|
||||
this.subscriber = this.client.duplicate()
|
||||
}
|
||||
|
||||
iAmLeader() : boolean {
|
||||
return this.leader == this.clientId
|
||||
}
|
||||
|
||||
get _saved_() : string { return `${this.docId}:saved` }
|
||||
get _saved_cursor_() : string { return `${this.docId}:saved:cursor` }
|
||||
get _changes_() : string { return `${this.docId}:changes` }
|
||||
get _leader_() : string { return `${this.docId}:leader` }
|
||||
get _notify_() : string { return `${this.docId}:notify` }
|
||||
|
||||
async change(f: (doc: Automerge) => void) {
|
||||
if (this.doc === null) {
|
||||
throw new RangeError("cannot call change - doc not initalized")
|
||||
}
|
||||
let heads = this.doc.getHeads()
|
||||
f(this.doc)
|
||||
let changes = this.doc.getChanges(heads)
|
||||
if (changes.length > 0) {
|
||||
for (let i in changes) {
|
||||
let change_str = Buffer.from(changes[i].buffer).toString("hex");
|
||||
await this.client.rPush(this._changes_, change_str)
|
||||
}
|
||||
await this.notify_peers()
|
||||
this.notify_local()
|
||||
}
|
||||
}
|
||||
|
||||
async tick() {
|
||||
let itsMe = await this.determineLeader()
|
||||
if (this.doc && itsMe) {
|
||||
if (this.incremental_save_bytes > this.save_bytes * 10) {
|
||||
console.log(" ::as leader - doing a full save")
|
||||
let save_all = this.doc.save()
|
||||
this.incremental_save_bytes += save_all.length
|
||||
let save_all_str = Buffer.from(save_all.buffer).toString("hex");
|
||||
await this.client.multi()
|
||||
.rPush(this._saved_, save_all_str)
|
||||
.set(this._saved_cursor_,this.cursor)
|
||||
.exec()
|
||||
} else {
|
||||
let next_chunk = this.doc.saveIncremental()
|
||||
if (next_chunk && next_chunk.length > 0) {
|
||||
console.log(" ::as leader - doing an incremental save")
|
||||
this.incremental_save_bytes += next_chunk.length
|
||||
let next_chunk_str = Buffer.from(next_chunk.buffer).toString("hex");
|
||||
await this.client.multi()
|
||||
.rPush(this._saved_, next_chunk_str)
|
||||
.set(this._saved_cursor_,this.cursor)
|
||||
.exec()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
toJS() : T | null {
|
||||
if (this.doc) {
|
||||
return this.doc.materialize("/") as T
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
async loadDocumentFromRedis() {
|
||||
console.log(" ::Loading document...");
|
||||
let saved = await this.client.lRange(this._saved_,0,-1)
|
||||
let cursor = await this.client.get(this._saved_cursor_)
|
||||
if (Array.isArray(saved) && typeof cursor === 'string') {
|
||||
let first_chunk_str = saved.shift();
|
||||
if (first_chunk_str) {
|
||||
let first_chunk = Buffer.from(first_chunk_str,'hex')
|
||||
let doc = load(first_chunk)
|
||||
this.save_bytes = first_chunk.length;
|
||||
for (let i in saved) {
|
||||
let chunk = Buffer.from(saved[i],'hex')
|
||||
doc.loadIncremental(chunk)
|
||||
this.incremental_save_bytes += chunk.length
|
||||
}
|
||||
this.cursor = parseInt(cursor);
|
||||
await this.fastForward(doc)
|
||||
this.doc = doc
|
||||
this.notify_local()
|
||||
console.log(` ::done! cursor=${cursor}`);
|
||||
return
|
||||
}
|
||||
}
|
||||
console.log(" ::no document found in redis");
|
||||
}
|
||||
|
||||
async fastForward(doc: Automerge) {
|
||||
if (doc !== null) {
|
||||
const changes = await this.client.lRange(this._changes_,this.cursor,-1)
|
||||
if (Array.isArray(changes)) {
|
||||
for (let i in changes) {
|
||||
const change = Buffer.from(changes[i],'hex')
|
||||
doc.loadIncremental(change)
|
||||
this.cursor += 1
|
||||
this.notify_local()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async handleMessage(message:string, channel: string) {
|
||||
if (message !== this.clientId && this.doc) {
|
||||
await this.fastForward(this.doc)
|
||||
}
|
||||
}
|
||||
|
||||
async connect() {
|
||||
await this.client.connect()
|
||||
await this.subscriber.connect()
|
||||
await this.subscriber.pSubscribe(this._notify_, (m,c) => this.handleMessage(m,c))
|
||||
await this.loadDocumentFromRedis()
|
||||
|
||||
const itsMe = await this.determineLeader()
|
||||
|
||||
if (itsMe && this.doc === null) {
|
||||
await this.resetDocumentState()
|
||||
}
|
||||
}
|
||||
|
||||
async resetDocumentState() {
|
||||
console.log(" ::resetting document state")
|
||||
let doc = create()
|
||||
this.init(doc)
|
||||
let saved = doc.save()
|
||||
if (!saved || saved.length === 0) {
|
||||
throw new RangeError("initalized document is blank");
|
||||
}
|
||||
this.doc = doc;
|
||||
this.save_bytes = saved.length;
|
||||
this.incremental_save_bytes = 0;
|
||||
let saved_str = Buffer.from(saved.buffer).toString("hex");
|
||||
await this.client.del(this._changes_)
|
||||
await this.client.del(this._saved_)
|
||||
await this.client.rPush(this._saved_, saved_str)
|
||||
await this.client.set(this._saved_cursor_,0)
|
||||
this.notify_local()
|
||||
}
|
||||
|
||||
async determineLeader() : Promise<boolean> {
|
||||
await this.client.setNX(this._leader_, this.clientId)
|
||||
this.leader = await this.client.get(this._leader_)
|
||||
const itsMe = this.leader == this.clientId
|
||||
if (itsMe) {
|
||||
this.client.expire(this._leader_, 60)
|
||||
}
|
||||
return itsMe
|
||||
}
|
||||
|
||||
async notify_peers() {
|
||||
await this.client.publish(this._notify_, this.clientId)
|
||||
}
|
||||
|
||||
notify_local() {
|
||||
if (this.doc && this.update) {
|
||||
this.update(this.toJS())
|
||||
}
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
await this.tick() // save any data
|
||||
clearInterval(this.interval)
|
||||
await this.client.disconnect();
|
||||
await this.subscriber.disconnect();
|
||||
}
|
||||
}
|
23
rust/automerge-wasm/examples/redis-sync/test/test.ts
Normal file
23
rust/automerge-wasm/examples/redis-sync/test/test.ts
Normal file
|
@ -0,0 +1,23 @@
|
|||
import * as assert from 'assert'
|
||||
import { RedisSync } from "../src"
|
||||
import { Automerge, create } from "@automerge/automerge-wasm"
|
||||
|
||||
describe('redis-sync', () => {
|
||||
describe('basics', () => {
|
||||
it('should be able to connect and disconnect', async () => {
|
||||
let doc = create();
|
||||
let sync = new RedisSync({
|
||||
redis: "redis://",
|
||||
docId: "DOC124",
|
||||
clientId: "client01",
|
||||
init: (doc) => {
|
||||
console.log("DOC",doc);
|
||||
doc.put("/","hello","world")
|
||||
}
|
||||
});
|
||||
await sync.connect();
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
//await sync.disconnect();
|
||||
})
|
||||
})
|
||||
})
|
22
rust/automerge-wasm/examples/redis-sync/tsconfig.json
Normal file
22
rust/automerge-wasm/examples/redis-sync/tsconfig.json
Normal file
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"target": "es2016",
|
||||
"sourceMap": false,
|
||||
"declaration": true,
|
||||
"resolveJsonModule": true,
|
||||
"module": "commonjs",
|
||||
"moduleResolution": "node",
|
||||
"noImplicitAny": false,
|
||||
"allowSyntheticDefaultImports": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"strict": true,
|
||||
"noFallthroughCasesInSwitch": true,
|
||||
"skipLibCheck": false,
|
||||
"outDir": "./dist"
|
||||
},
|
||||
"include": [ "src/**/*" ],
|
||||
"exclude": [
|
||||
"./dist/**/*",
|
||||
"./node_modules"
|
||||
]
|
||||
}
|
3
rust/automerge-wasm/examples/redis-sync/tslint.json
Normal file
3
rust/automerge-wasm/examples/redis-sync/tslint.json
Normal file
|
@ -0,0 +1,3 @@
|
|||
{
|
||||
"extends": "tslint:recommended"
|
||||
}
|
6
rust/badmessage/.gitignore
vendored
Normal file
6
rust/badmessage/.gitignore
vendored
Normal file
|
@ -0,0 +1,6 @@
|
|||
/target
|
||||
Cargo.lock
|
||||
node_modules
|
||||
yarn.lock
|
||||
flamegraph.svg
|
||||
/prof
|
18
rust/badmessage/Cargo.toml
Normal file
18
rust/badmessage/Cargo.toml
Normal file
|
@ -0,0 +1,18 @@
|
|||
[package]
|
||||
name = "badmessage"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
automerge = { path = "../automerge" }
|
||||
criterion = "0.3.5"
|
||||
json = "0.12.4"
|
||||
rand = "^0.8"
|
||||
|
||||
|
||||
[[bin]]
|
||||
name = "badmessage"
|
||||
doc = false
|
||||
bench = false
|
||||
|
26
rust/badmessage/automerge-js.js
Normal file
26
rust/badmessage/automerge-js.js
Normal file
|
@ -0,0 +1,26 @@
|
|||
// Apply the paper editing trace to an Automerge.Text object, one char at a time
|
||||
const Automerge = require('../../javascript')
|
||||
|
||||
const fs = require('fs');
|
||||
|
||||
const start = new Date()
|
||||
|
||||
let contents = fs.readFileSync("badmessage");
|
||||
let doc = Automerge.init();
|
||||
let state = Automerge.initSyncState();
|
||||
[doc,state] = Automerge.receiveSyncMessage(doc, state, contents);
|
||||
|
||||
console.log(`doc.receiveSyncMessage in ${new Date() - start} ms`)
|
||||
|
||||
let t_time = new Date()
|
||||
let saved = Automerge.save(doc);
|
||||
console.log(`doc.save in ${new Date() - t_time} ms`)
|
||||
|
||||
t_time = new Date()
|
||||
Automerge.load(saved)
|
||||
console.log(`doc.load in ${new Date() - t_time} ms`)
|
||||
|
||||
t_time = new Date()
|
||||
let doc2 = Automerge.init()
|
||||
doc2 = Automerge.loadIncremental(doc2,saved)
|
||||
console.log(`doc.loadIncremental in ${new Date() - t_time} ms`)
|
26
rust/badmessage/automerge-wasm.js
Normal file
26
rust/badmessage/automerge-wasm.js
Normal file
|
@ -0,0 +1,26 @@
|
|||
const Automerge = require('../automerge-wasm')
|
||||
const fs = require('fs');
|
||||
|
||||
|
||||
let contents = fs.readFileSync("badmessage");
|
||||
let doc = Automerge.create();
|
||||
let state = Automerge.initSyncState();
|
||||
|
||||
let t_time = new Date()
|
||||
|
||||
doc.receiveSyncMessage(state,contents);
|
||||
|
||||
console.log(`doc.receiveSyncMessage in ${new Date() - t_time} ms`)
|
||||
|
||||
t_time = new Date()
|
||||
let saved = doc.save()
|
||||
console.log(`doc.save in ${new Date() - t_time} ms`)
|
||||
|
||||
t_time = new Date()
|
||||
Automerge.load(saved)
|
||||
console.log(`doc.load in ${new Date() - t_time} ms`)
|
||||
|
||||
t_time = new Date()
|
||||
let doc2 = Automerge.create()
|
||||
doc2.loadIncremental(saved)
|
||||
console.log(`doc.loadIncremental in ${new Date() - t_time} ms`)
|
BIN
rust/badmessage/badmessage
Executable file
BIN
rust/badmessage/badmessage
Executable file
Binary file not shown.
13
rust/badmessage/package.json
Normal file
13
rust/badmessage/package.json
Normal file
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"name": "badmessage",
|
||||
"version": "1.0.0",
|
||||
"main": "wasm-text.js",
|
||||
"license": "MIT",
|
||||
"scripts": {
|
||||
"wasm": "0x -D prof automerge-wasm.js",
|
||||
"js": "0x -D prof automerge-js.js"
|
||||
},
|
||||
"devDependencies": {
|
||||
"0x": "^5.4.1"
|
||||
}
|
||||
}
|
25
rust/badmessage/src/main.rs
Normal file
25
rust/badmessage/src/main.rs
Normal file
|
@ -0,0 +1,25 @@
|
|||
use automerge::sync;
|
||||
use automerge::{Automerge, AutomergeError};
|
||||
use std::fs;
|
||||
use std::time::Instant;
|
||||
|
||||
fn main() -> Result<(), AutomergeError> {
|
||||
let contents = fs::read("badmessage").expect("cant read badmessage file");
|
||||
let mut doc = Automerge::new();
|
||||
let mut state = sync::State::new();
|
||||
let now = Instant::now();
|
||||
// decode and receive happen at the same time in wasm so lets keep it apples to apples
|
||||
let message = sync::Message::decode(contents.as_slice()).expect("cant decode message");
|
||||
doc.receive_sync_message(&mut state, message).unwrap();
|
||||
println!("decode/receive in {} ms", now.elapsed().as_millis());
|
||||
let now = Instant::now();
|
||||
let saved = doc.save();
|
||||
println!("save in {} ms", now.elapsed().as_millis());
|
||||
let now = Instant::now();
|
||||
let _ = Automerge::load(&saved).unwrap();
|
||||
println!("load in {} ms", now.elapsed().as_millis());
|
||||
let mut doc2 = Automerge::new();
|
||||
doc2.load_incremental(saved.as_slice()).unwrap();
|
||||
println!("load_incremental in {} ms", now.elapsed().as_millis());
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue