diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 938100cf..9989b96c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -6,6 +6,7 @@ members = [ "automerge-test", "automerge-wasm", "edit-trace", + "badmessage", ] resolver = "2" diff --git a/rust/automerge-wasm/examples/redis-sync/.eslintignore b/rust/automerge-wasm/examples/redis-sync/.eslintignore new file mode 100644 index 00000000..4d6880d3 --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/.eslintignore @@ -0,0 +1,2 @@ +dist +examples diff --git a/rust/automerge-wasm/examples/redis-sync/.eslintrc.cjs b/rust/automerge-wasm/examples/redis-sync/.eslintrc.cjs new file mode 100644 index 00000000..80e08d55 --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/.eslintrc.cjs @@ -0,0 +1,11 @@ +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + plugins: [ + '@typescript-eslint', + ], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/recommended', + ], +}; diff --git a/rust/automerge-wasm/examples/redis-sync/.gitignore b/rust/automerge-wasm/examples/redis-sync/.gitignore new file mode 100644 index 00000000..793cb004 --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/.gitignore @@ -0,0 +1,2 @@ +/dist +node_modules diff --git a/rust/automerge-wasm/examples/redis-sync/README.md b/rust/automerge-wasm/examples/redis-sync/README.md new file mode 100644 index 00000000..d89cc00e --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/README.md @@ -0,0 +1,36 @@ + +## Quick Example of Syncing Automerge Docs via Redis + +The good: + 1. store the document in redis as both a compacted document save and as individual changes + 2. able to load the the saved document, and then calculate which changes are missing in a single pass + 3. uses redis (RPUSH) to get global change ordering + 4. wake all clients on new changes with redis (PUBLISH, SUBSCRIBE) + 5. elect (via SETNX) a single leader to saveIncremental() every 10 seconds + 6. said node also does a full doc save when incremental bytes are 10x the base save + 7. detects disconnection of the leader (via EXPIRE) and elects a new leader + 8. detects when a document has not been created and elects a single node to create it + +The bad: + 1. redis can do binary data - i couldnt figure it out - i hex encode/decode everything + 2. there's probably a few race conditions i need to find + 3. my typescript is ass + 4. not much in the way of error handling + 5. should really be using applyPatches() but im not + 6. the api is a little funny + +How to use: + +``` +$ yarn +$ yarn build +``` + +Then run + +``` +$ node client.js $DOCID +``` + +In two or three different windows and watch each client editing the same document. + diff --git a/rust/automerge-wasm/examples/redis-sync/client.js b/rust/automerge-wasm/examples/redis-sync/client.js new file mode 100644 index 00000000..33f4219b --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/client.js @@ -0,0 +1,43 @@ +let { RedisSync } = require('.') +let clientId = `C${ rand(1000) }` +let docId = process.argv[2] + +if (typeof docId !== 'string' || docId.length < 2 || docId.length > 20) { + console.log("usage: node client.js DOCID") + process.exit(1) +} + +console.log("clientID is", clientId) +console.log("docID is", docId) + +let init = (doc) => { + doc.put("/","title", "token tracker") + doc.putObject("/","tokens", []) +} + +let sync = new RedisSync({ redis: "redis://", docId, clientId, init, update }); +sync.connect() + +function rand(max) { + return Math.floor(Math.random() * max) +} + +function update() { + console.log("DOC STATE", sync.toJS()) +} + +function tweak() { + if (rand(3) == 0) { + sync.change((doc) => { + let len = doc.length("/tokens") + if (len + rand(10) > 20) { + doc.delete("/tokens", rand(len)) + } else { + doc.insert("/tokens", rand(len), rand(255)) + } + doc.put("/", "winner", clientId) + }) + } +} + +setInterval(tweak, 3000) diff --git a/rust/automerge-wasm/examples/redis-sync/config/cjs.json b/rust/automerge-wasm/examples/redis-sync/config/cjs.json new file mode 100644 index 00000000..d7f8c63f --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/config/cjs.json @@ -0,0 +1,6 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "outDir": "../dist/cjs" + } +} diff --git a/rust/automerge-wasm/examples/redis-sync/config/mjs.json b/rust/automerge-wasm/examples/redis-sync/config/mjs.json new file mode 100644 index 00000000..d734434e --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/config/mjs.json @@ -0,0 +1,8 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "target": "es2017", + "module": "esnext", + "outDir": "../dist/mjs" + } +} diff --git a/rust/automerge-wasm/examples/redis-sync/package.json b/rust/automerge-wasm/examples/redis-sync/package.json new file mode 100644 index 00000000..ee01e64a --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/package.json @@ -0,0 +1,33 @@ +{ + "name": "redis-sync", + "version": "0.0.1", + "description": "", + "types": "./dist/index.d.ts", + "module": "./dist/mjs/index.js", + "main": "./dist/cjs/index.js", + "scripts": { + "lint": "eslint src", + "build": "tsc -p config/mjs.json && tsc -p config/cjs.json && tsc --emitDeclarationOnly", + "test": "ts-mocha test/*.ts", + "watch-docs": "typedoc src/index.ts --watch --readme typedoc-readme.md" + }, + "author": "", + "license": "ISC", + "dependencies": { + "@automerge/automerge-wasm": "^0.1.19", + "redis": "^4.5.1" + }, + "devDependencies": { + "@types/expect": "^24.3.0", + "@types/mocha": "^9.1.1", + "@types/uuid": "^8.3.4", + "@typescript-eslint/eslint-plugin": "^5.25.0", + "@typescript-eslint/parser": "^5.25.0", + "eslint": "^8.15.0", + "mocha": "^10.0.0", + "ts-mocha": "^10.0.0", + "ts-node": "^10.9.1", + "typedoc": "^0.23.16", + "typescript": "^4.6.4" + } +} diff --git a/rust/automerge-wasm/examples/redis-sync/src/index.ts b/rust/automerge-wasm/examples/redis-sync/src/index.ts new file mode 100644 index 00000000..5f302f4b --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/src/index.ts @@ -0,0 +1,210 @@ +import { createClient } from 'redis'; +import type { RedisClientType, RedisClientOptions } from 'redis' +import { Automerge, create, load } from "@automerge/automerge-wasm" + +export type RedisSyncOptions = { + redis: string, + docId: string, + clientId: string, + init: (doc: Automerge) => void, + update?: (doc: any) => void, +} + +export class RedisSync { + client: RedisClientType + subscriber: RedisClientType + doc: null | Automerge + docId: string + clientId: string + cursor: number + leader: string | null + init: (doc: Automerge) => void + update?: (doc: any) => void + interval: ReturnType + save_bytes: number + incremental_save_bytes: number + + constructor(options: RedisSyncOptions) { + this.doc = null + this.docId = options.docId + this.client = createClient({ url: options.redis }) + this.clientId = options.clientId + this.cursor = 0 + this.init = options.init + this.update = options.update + this.interval = setInterval(() => this.tick(), 10 * 1000) // force a save every 10 seconds + this.leader = null + this.save_bytes = 0 + this.incremental_save_bytes = 0 + + this.client.on('error', (err) => console.log('Redis Client Error', err)); + this.subscriber = this.client.duplicate() + } + + iAmLeader() : boolean { + return this.leader == this.clientId + } + + get _saved_() : string { return `${this.docId}:saved` } + get _saved_cursor_() : string { return `${this.docId}:saved:cursor` } + get _changes_() : string { return `${this.docId}:changes` } + get _leader_() : string { return `${this.docId}:leader` } + get _notify_() : string { return `${this.docId}:notify` } + + async change(f: (doc: Automerge) => void) { + if (this.doc === null) { + throw new RangeError("cannot call change - doc not initalized") + } + let heads = this.doc.getHeads() + f(this.doc) + let changes = this.doc.getChanges(heads) + if (changes.length > 0) { + for (let i in changes) { + let change_str = Buffer.from(changes[i].buffer).toString("hex"); + await this.client.rPush(this._changes_, change_str) + } + await this.notify_peers() + this.notify_local() + } + } + + async tick() { + let itsMe = await this.determineLeader() + if (this.doc && itsMe) { + if (this.incremental_save_bytes > this.save_bytes * 10) { + console.log(" ::as leader - doing a full save") + let save_all = this.doc.save() + this.incremental_save_bytes += save_all.length + let save_all_str = Buffer.from(save_all.buffer).toString("hex"); + await this.client.multi() + .rPush(this._saved_, save_all_str) + .set(this._saved_cursor_,this.cursor) + .exec() + } else { + let next_chunk = this.doc.saveIncremental() + if (next_chunk && next_chunk.length > 0) { + console.log(" ::as leader - doing an incremental save") + this.incremental_save_bytes += next_chunk.length + let next_chunk_str = Buffer.from(next_chunk.buffer).toString("hex"); + await this.client.multi() + .rPush(this._saved_, next_chunk_str) + .set(this._saved_cursor_,this.cursor) + .exec() + } + } + } + } + + toJS() : T | null { + if (this.doc) { + return this.doc.materialize("/") as T + } else { + return null + } + } + + async loadDocumentFromRedis() { + console.log(" ::Loading document..."); + let saved = await this.client.lRange(this._saved_,0,-1) + let cursor = await this.client.get(this._saved_cursor_) + if (Array.isArray(saved) && typeof cursor === 'string') { + let first_chunk_str = saved.shift(); + if (first_chunk_str) { + let first_chunk = Buffer.from(first_chunk_str,'hex') + let doc = load(first_chunk) + this.save_bytes = first_chunk.length; + for (let i in saved) { + let chunk = Buffer.from(saved[i],'hex') + doc.loadIncremental(chunk) + this.incremental_save_bytes += chunk.length + } + this.cursor = parseInt(cursor); + await this.fastForward(doc) + this.doc = doc + this.notify_local() + console.log(` ::done! cursor=${cursor}`); + return + } + } + console.log(" ::no document found in redis"); + } + + async fastForward(doc: Automerge) { + if (doc !== null) { + const changes = await this.client.lRange(this._changes_,this.cursor,-1) + if (Array.isArray(changes)) { + for (let i in changes) { + const change = Buffer.from(changes[i],'hex') + doc.loadIncremental(change) + this.cursor += 1 + this.notify_local() + } + } + } + } + + async handleMessage(message:string, channel: string) { + if (message !== this.clientId && this.doc) { + await this.fastForward(this.doc) + } + } + + async connect() { + await this.client.connect() + await this.subscriber.connect() + await this.subscriber.pSubscribe(this._notify_, (m,c) => this.handleMessage(m,c)) + await this.loadDocumentFromRedis() + + const itsMe = await this.determineLeader() + + if (itsMe && this.doc === null) { + await this.resetDocumentState() + } + } + + async resetDocumentState() { + console.log(" ::resetting document state") + let doc = create() + this.init(doc) + let saved = doc.save() + if (!saved || saved.length === 0) { + throw new RangeError("initalized document is blank"); + } + this.doc = doc; + this.save_bytes = saved.length; + this.incremental_save_bytes = 0; + let saved_str = Buffer.from(saved.buffer).toString("hex"); + await this.client.del(this._changes_) + await this.client.del(this._saved_) + await this.client.rPush(this._saved_, saved_str) + await this.client.set(this._saved_cursor_,0) + this.notify_local() + } + + async determineLeader() : Promise { + await this.client.setNX(this._leader_, this.clientId) + this.leader = await this.client.get(this._leader_) + const itsMe = this.leader == this.clientId + if (itsMe) { + this.client.expire(this._leader_, 60) + } + return itsMe + } + + async notify_peers() { + await this.client.publish(this._notify_, this.clientId) + } + + notify_local() { + if (this.doc && this.update) { + this.update(this.toJS()) + } + } + + async disconnect() { + await this.tick() // save any data + clearInterval(this.interval) + await this.client.disconnect(); + await this.subscriber.disconnect(); + } +} diff --git a/rust/automerge-wasm/examples/redis-sync/test/test.ts b/rust/automerge-wasm/examples/redis-sync/test/test.ts new file mode 100644 index 00000000..f1162dbb --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/test/test.ts @@ -0,0 +1,23 @@ +import * as assert from 'assert' +import { RedisSync } from "../src" +import { Automerge, create } from "@automerge/automerge-wasm" + +describe('redis-sync', () => { + describe('basics', () => { + it('should be able to connect and disconnect', async () => { + let doc = create(); + let sync = new RedisSync({ + redis: "redis://", + docId: "DOC124", + clientId: "client01", + init: (doc) => { + console.log("DOC",doc); + doc.put("/","hello","world") + } + }); + await sync.connect(); + await new Promise(resolve => setTimeout(resolve, 1000)) + //await sync.disconnect(); + }) + }) +}) diff --git a/rust/automerge-wasm/examples/redis-sync/tsconfig.json b/rust/automerge-wasm/examples/redis-sync/tsconfig.json new file mode 100644 index 00000000..35520e22 --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "target": "es2016", + "sourceMap": false, + "declaration": true, + "resolveJsonModule": true, + "module": "commonjs", + "moduleResolution": "node", + "noImplicitAny": false, + "allowSyntheticDefaultImports": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "noFallthroughCasesInSwitch": true, + "skipLibCheck": false, + "outDir": "./dist" + }, + "include": [ "src/**/*" ], + "exclude": [ + "./dist/**/*", + "./node_modules" + ] +} diff --git a/rust/automerge-wasm/examples/redis-sync/tslint.json b/rust/automerge-wasm/examples/redis-sync/tslint.json new file mode 100644 index 00000000..f7bb7a71 --- /dev/null +++ b/rust/automerge-wasm/examples/redis-sync/tslint.json @@ -0,0 +1,3 @@ +{ + "extends": "tslint:recommended" +} diff --git a/rust/badmessage/.gitignore b/rust/badmessage/.gitignore new file mode 100644 index 00000000..55778aca --- /dev/null +++ b/rust/badmessage/.gitignore @@ -0,0 +1,6 @@ +/target +Cargo.lock +node_modules +yarn.lock +flamegraph.svg +/prof diff --git a/rust/badmessage/Cargo.toml b/rust/badmessage/Cargo.toml new file mode 100644 index 00000000..e6b93447 --- /dev/null +++ b/rust/badmessage/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "badmessage" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +automerge = { path = "../automerge" } +criterion = "0.3.5" +json = "0.12.4" +rand = "^0.8" + + +[[bin]] +name = "badmessage" +doc = false +bench = false + diff --git a/rust/badmessage/automerge-js.js b/rust/badmessage/automerge-js.js new file mode 100644 index 00000000..80d19051 --- /dev/null +++ b/rust/badmessage/automerge-js.js @@ -0,0 +1,26 @@ +// Apply the paper editing trace to an Automerge.Text object, one char at a time +const Automerge = require('../../javascript') + +const fs = require('fs'); + +const start = new Date() + +let contents = fs.readFileSync("badmessage"); +let doc = Automerge.init(); +let state = Automerge.initSyncState(); +[doc,state] = Automerge.receiveSyncMessage(doc, state, contents); + +console.log(`doc.receiveSyncMessage in ${new Date() - start} ms`) + +let t_time = new Date() +let saved = Automerge.save(doc); +console.log(`doc.save in ${new Date() - t_time} ms`) + +t_time = new Date() +Automerge.load(saved) +console.log(`doc.load in ${new Date() - t_time} ms`) + +t_time = new Date() +let doc2 = Automerge.init() +doc2 = Automerge.loadIncremental(doc2,saved) +console.log(`doc.loadIncremental in ${new Date() - t_time} ms`) diff --git a/rust/badmessage/automerge-wasm.js b/rust/badmessage/automerge-wasm.js new file mode 100644 index 00000000..33ac46bc --- /dev/null +++ b/rust/badmessage/automerge-wasm.js @@ -0,0 +1,26 @@ +const Automerge = require('../automerge-wasm') +const fs = require('fs'); + + +let contents = fs.readFileSync("badmessage"); +let doc = Automerge.create(); +let state = Automerge.initSyncState(); + +let t_time = new Date() + +doc.receiveSyncMessage(state,contents); + +console.log(`doc.receiveSyncMessage in ${new Date() - t_time} ms`) + +t_time = new Date() +let saved = doc.save() +console.log(`doc.save in ${new Date() - t_time} ms`) + +t_time = new Date() +Automerge.load(saved) +console.log(`doc.load in ${new Date() - t_time} ms`) + +t_time = new Date() +let doc2 = Automerge.create() +doc2.loadIncremental(saved) +console.log(`doc.loadIncremental in ${new Date() - t_time} ms`) diff --git a/rust/badmessage/badmessage b/rust/badmessage/badmessage new file mode 100755 index 00000000..f2d5ff29 Binary files /dev/null and b/rust/badmessage/badmessage differ diff --git a/rust/badmessage/package.json b/rust/badmessage/package.json new file mode 100644 index 00000000..e8d2ad3f --- /dev/null +++ b/rust/badmessage/package.json @@ -0,0 +1,13 @@ +{ + "name": "badmessage", + "version": "1.0.0", + "main": "wasm-text.js", + "license": "MIT", + "scripts": { + "wasm": "0x -D prof automerge-wasm.js", + "js": "0x -D prof automerge-js.js" + }, + "devDependencies": { + "0x": "^5.4.1" + } +} diff --git a/rust/badmessage/src/main.rs b/rust/badmessage/src/main.rs new file mode 100644 index 00000000..a67005c7 --- /dev/null +++ b/rust/badmessage/src/main.rs @@ -0,0 +1,25 @@ +use automerge::sync; +use automerge::{Automerge, AutomergeError}; +use std::fs; +use std::time::Instant; + +fn main() -> Result<(), AutomergeError> { + let contents = fs::read("badmessage").expect("cant read badmessage file"); + let mut doc = Automerge::new(); + let mut state = sync::State::new(); + let now = Instant::now(); + // decode and receive happen at the same time in wasm so lets keep it apples to apples + let message = sync::Message::decode(contents.as_slice()).expect("cant decode message"); + doc.receive_sync_message(&mut state, message).unwrap(); + println!("decode/receive in {} ms", now.elapsed().as_millis()); + let now = Instant::now(); + let saved = doc.save(); + println!("save in {} ms", now.elapsed().as_millis()); + let now = Instant::now(); + let _ = Automerge::load(&saved).unwrap(); + println!("load in {} ms", now.elapsed().as_millis()); + let mut doc2 = Automerge::new(); + doc2.load_incremental(saved.as_slice()).unwrap(); + println!("load_incremental in {} ms", now.elapsed().as_millis()); + Ok(()) +}