Compare commits

...

2 commits
main ... redis

Author SHA1 Message Date
Orion Henry
0975c5614b add a POC redis sync class 2022-12-14 12:26:48 -06:00
Orion Henry
b7415d18eb adding a badmessage performance case 2022-12-09 18:01:57 -06:00
20 changed files with 514 additions and 0 deletions

View file

@ -6,6 +6,7 @@ members = [
"automerge-test",
"automerge-wasm",
"edit-trace",
"badmessage",
]
resolver = "2"

View file

@ -0,0 +1,2 @@
dist
examples

View file

@ -0,0 +1,11 @@
module.exports = {
root: true,
parser: '@typescript-eslint/parser',
plugins: [
'@typescript-eslint',
],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/recommended',
],
};

View file

@ -0,0 +1,2 @@
/dist
node_modules

View file

@ -0,0 +1,36 @@
## Quick Example of Syncing Automerge Docs via Redis
The good:
1. store the document in redis as both a compacted document save and as individual changes
2. able to load the the saved document, and then calculate which changes are missing in a single pass
3. uses redis (RPUSH) to get global change ordering
4. wake all clients on new changes with redis (PUBLISH, SUBSCRIBE)
5. elect (via SETNX) a single leader to saveIncremental() every 10 seconds
6. said node also does a full doc save when incremental bytes are 10x the base save
7. detects disconnection of the leader (via EXPIRE) and elects a new leader
8. detects when a document has not been created and elects a single node to create it
The bad:
1. redis can do binary data - i couldnt figure it out - i hex encode/decode everything
2. there's probably a few race conditions i need to find
3. my typescript is ass
4. not much in the way of error handling
5. should really be using applyPatches() but im not
6. the api is a little funny
How to use:
```
$ yarn
$ yarn build
```
Then run
```
$ node client.js $DOCID
```
In two or three different windows and watch each client editing the same document.

View file

@ -0,0 +1,43 @@
let { RedisSync } = require('.')
let clientId = `C${ rand(1000) }`
let docId = process.argv[2]
if (typeof docId !== 'string' || docId.length < 2 || docId.length > 20) {
console.log("usage: node client.js DOCID")
process.exit(1)
}
console.log("clientID is", clientId)
console.log("docID is", docId)
let init = (doc) => {
doc.put("/","title", "token tracker")
doc.putObject("/","tokens", [])
}
let sync = new RedisSync({ redis: "redis://", docId, clientId, init, update });
sync.connect()
function rand(max) {
return Math.floor(Math.random() * max)
}
function update() {
console.log("DOC STATE", sync.toJS())
}
function tweak() {
if (rand(3) == 0) {
sync.change((doc) => {
let len = doc.length("/tokens")
if (len + rand(10) > 20) {
doc.delete("/tokens", rand(len))
} else {
doc.insert("/tokens", rand(len), rand(255))
}
doc.put("/", "winner", clientId)
})
}
}
setInterval(tweak, 3000)

View file

@ -0,0 +1,6 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"outDir": "../dist/cjs"
}
}

View file

@ -0,0 +1,8 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"target": "es2017",
"module": "esnext",
"outDir": "../dist/mjs"
}
}

View file

@ -0,0 +1,33 @@
{
"name": "redis-sync",
"version": "0.0.1",
"description": "",
"types": "./dist/index.d.ts",
"module": "./dist/mjs/index.js",
"main": "./dist/cjs/index.js",
"scripts": {
"lint": "eslint src",
"build": "tsc -p config/mjs.json && tsc -p config/cjs.json && tsc --emitDeclarationOnly",
"test": "ts-mocha test/*.ts",
"watch-docs": "typedoc src/index.ts --watch --readme typedoc-readme.md"
},
"author": "",
"license": "ISC",
"dependencies": {
"@automerge/automerge-wasm": "^0.1.19",
"redis": "^4.5.1"
},
"devDependencies": {
"@types/expect": "^24.3.0",
"@types/mocha": "^9.1.1",
"@types/uuid": "^8.3.4",
"@typescript-eslint/eslint-plugin": "^5.25.0",
"@typescript-eslint/parser": "^5.25.0",
"eslint": "^8.15.0",
"mocha": "^10.0.0",
"ts-mocha": "^10.0.0",
"ts-node": "^10.9.1",
"typedoc": "^0.23.16",
"typescript": "^4.6.4"
}
}

View file

@ -0,0 +1,210 @@
import { createClient } from 'redis';
import type { RedisClientType, RedisClientOptions } from 'redis'
import { Automerge, create, load } from "@automerge/automerge-wasm"
export type RedisSyncOptions<T> = {
redis: string,
docId: string,
clientId: string,
init: (doc: Automerge) => void,
update?: (doc: any) => void,
}
export class RedisSync<T> {
client: RedisClientType
subscriber: RedisClientType
doc: null | Automerge
docId: string
clientId: string
cursor: number
leader: string | null
init: (doc: Automerge) => void
update?: (doc: any) => void
interval: ReturnType<typeof setInterval>
save_bytes: number
incremental_save_bytes: number
constructor(options: RedisSyncOptions<T>) {
this.doc = null
this.docId = options.docId
this.client = createClient({ url: options.redis })
this.clientId = options.clientId
this.cursor = 0
this.init = options.init
this.update = options.update
this.interval = setInterval(() => this.tick(), 10 * 1000) // force a save every 10 seconds
this.leader = null
this.save_bytes = 0
this.incremental_save_bytes = 0
this.client.on('error', (err) => console.log('Redis Client Error', err));
this.subscriber = this.client.duplicate()
}
iAmLeader() : boolean {
return this.leader == this.clientId
}
get _saved_() : string { return `${this.docId}:saved` }
get _saved_cursor_() : string { return `${this.docId}:saved:cursor` }
get _changes_() : string { return `${this.docId}:changes` }
get _leader_() : string { return `${this.docId}:leader` }
get _notify_() : string { return `${this.docId}:notify` }
async change(f: (doc: Automerge) => void) {
if (this.doc === null) {
throw new RangeError("cannot call change - doc not initalized")
}
let heads = this.doc.getHeads()
f(this.doc)
let changes = this.doc.getChanges(heads)
if (changes.length > 0) {
for (let i in changes) {
let change_str = Buffer.from(changes[i].buffer).toString("hex");
await this.client.rPush(this._changes_, change_str)
}
await this.notify_peers()
this.notify_local()
}
}
async tick() {
let itsMe = await this.determineLeader()
if (this.doc && itsMe) {
if (this.incremental_save_bytes > this.save_bytes * 10) {
console.log(" ::as leader - doing a full save")
let save_all = this.doc.save()
this.incremental_save_bytes += save_all.length
let save_all_str = Buffer.from(save_all.buffer).toString("hex");
await this.client.multi()
.rPush(this._saved_, save_all_str)
.set(this._saved_cursor_,this.cursor)
.exec()
} else {
let next_chunk = this.doc.saveIncremental()
if (next_chunk && next_chunk.length > 0) {
console.log(" ::as leader - doing an incremental save")
this.incremental_save_bytes += next_chunk.length
let next_chunk_str = Buffer.from(next_chunk.buffer).toString("hex");
await this.client.multi()
.rPush(this._saved_, next_chunk_str)
.set(this._saved_cursor_,this.cursor)
.exec()
}
}
}
}
toJS() : T | null {
if (this.doc) {
return this.doc.materialize("/") as T
} else {
return null
}
}
async loadDocumentFromRedis() {
console.log(" ::Loading document...");
let saved = await this.client.lRange(this._saved_,0,-1)
let cursor = await this.client.get(this._saved_cursor_)
if (Array.isArray(saved) && typeof cursor === 'string') {
let first_chunk_str = saved.shift();
if (first_chunk_str) {
let first_chunk = Buffer.from(first_chunk_str,'hex')
let doc = load(first_chunk)
this.save_bytes = first_chunk.length;
for (let i in saved) {
let chunk = Buffer.from(saved[i],'hex')
doc.loadIncremental(chunk)
this.incremental_save_bytes += chunk.length
}
this.cursor = parseInt(cursor);
await this.fastForward(doc)
this.doc = doc
this.notify_local()
console.log(` ::done! cursor=${cursor}`);
return
}
}
console.log(" ::no document found in redis");
}
async fastForward(doc: Automerge) {
if (doc !== null) {
const changes = await this.client.lRange(this._changes_,this.cursor,-1)
if (Array.isArray(changes)) {
for (let i in changes) {
const change = Buffer.from(changes[i],'hex')
doc.loadIncremental(change)
this.cursor += 1
this.notify_local()
}
}
}
}
async handleMessage(message:string, channel: string) {
if (message !== this.clientId && this.doc) {
await this.fastForward(this.doc)
}
}
async connect() {
await this.client.connect()
await this.subscriber.connect()
await this.subscriber.pSubscribe(this._notify_, (m,c) => this.handleMessage(m,c))
await this.loadDocumentFromRedis()
const itsMe = await this.determineLeader()
if (itsMe && this.doc === null) {
await this.resetDocumentState()
}
}
async resetDocumentState() {
console.log(" ::resetting document state")
let doc = create()
this.init(doc)
let saved = doc.save()
if (!saved || saved.length === 0) {
throw new RangeError("initalized document is blank");
}
this.doc = doc;
this.save_bytes = saved.length;
this.incremental_save_bytes = 0;
let saved_str = Buffer.from(saved.buffer).toString("hex");
await this.client.del(this._changes_)
await this.client.del(this._saved_)
await this.client.rPush(this._saved_, saved_str)
await this.client.set(this._saved_cursor_,0)
this.notify_local()
}
async determineLeader() : Promise<boolean> {
await this.client.setNX(this._leader_, this.clientId)
this.leader = await this.client.get(this._leader_)
const itsMe = this.leader == this.clientId
if (itsMe) {
this.client.expire(this._leader_, 60)
}
return itsMe
}
async notify_peers() {
await this.client.publish(this._notify_, this.clientId)
}
notify_local() {
if (this.doc && this.update) {
this.update(this.toJS())
}
}
async disconnect() {
await this.tick() // save any data
clearInterval(this.interval)
await this.client.disconnect();
await this.subscriber.disconnect();
}
}

View file

@ -0,0 +1,23 @@
import * as assert from 'assert'
import { RedisSync } from "../src"
import { Automerge, create } from "@automerge/automerge-wasm"
describe('redis-sync', () => {
describe('basics', () => {
it('should be able to connect and disconnect', async () => {
let doc = create();
let sync = new RedisSync({
redis: "redis://",
docId: "DOC124",
clientId: "client01",
init: (doc) => {
console.log("DOC",doc);
doc.put("/","hello","world")
}
});
await sync.connect();
await new Promise(resolve => setTimeout(resolve, 1000))
//await sync.disconnect();
})
})
})

View file

@ -0,0 +1,22 @@
{
"compilerOptions": {
"target": "es2016",
"sourceMap": false,
"declaration": true,
"resolveJsonModule": true,
"module": "commonjs",
"moduleResolution": "node",
"noImplicitAny": false,
"allowSyntheticDefaultImports": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"noFallthroughCasesInSwitch": true,
"skipLibCheck": false,
"outDir": "./dist"
},
"include": [ "src/**/*" ],
"exclude": [
"./dist/**/*",
"./node_modules"
]
}

View file

@ -0,0 +1,3 @@
{
"extends": "tslint:recommended"
}

6
rust/badmessage/.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
/target
Cargo.lock
node_modules
yarn.lock
flamegraph.svg
/prof

View file

@ -0,0 +1,18 @@
[package]
name = "badmessage"
version = "0.1.0"
edition = "2021"
license = "MIT"
[dependencies]
automerge = { path = "../automerge" }
criterion = "0.3.5"
json = "0.12.4"
rand = "^0.8"
[[bin]]
name = "badmessage"
doc = false
bench = false

View file

@ -0,0 +1,26 @@
// Apply the paper editing trace to an Automerge.Text object, one char at a time
const Automerge = require('../../javascript')
const fs = require('fs');
const start = new Date()
let contents = fs.readFileSync("badmessage");
let doc = Automerge.init();
let state = Automerge.initSyncState();
[doc,state] = Automerge.receiveSyncMessage(doc, state, contents);
console.log(`doc.receiveSyncMessage in ${new Date() - start} ms`)
let t_time = new Date()
let saved = Automerge.save(doc);
console.log(`doc.save in ${new Date() - t_time} ms`)
t_time = new Date()
Automerge.load(saved)
console.log(`doc.load in ${new Date() - t_time} ms`)
t_time = new Date()
let doc2 = Automerge.init()
doc2 = Automerge.loadIncremental(doc2,saved)
console.log(`doc.loadIncremental in ${new Date() - t_time} ms`)

View file

@ -0,0 +1,26 @@
const Automerge = require('../automerge-wasm')
const fs = require('fs');
let contents = fs.readFileSync("badmessage");
let doc = Automerge.create();
let state = Automerge.initSyncState();
let t_time = new Date()
doc.receiveSyncMessage(state,contents);
console.log(`doc.receiveSyncMessage in ${new Date() - t_time} ms`)
t_time = new Date()
let saved = doc.save()
console.log(`doc.save in ${new Date() - t_time} ms`)
t_time = new Date()
Automerge.load(saved)
console.log(`doc.load in ${new Date() - t_time} ms`)
t_time = new Date()
let doc2 = Automerge.create()
doc2.loadIncremental(saved)
console.log(`doc.loadIncremental in ${new Date() - t_time} ms`)

BIN
rust/badmessage/badmessage Executable file

Binary file not shown.

View file

@ -0,0 +1,13 @@
{
"name": "badmessage",
"version": "1.0.0",
"main": "wasm-text.js",
"license": "MIT",
"scripts": {
"wasm": "0x -D prof automerge-wasm.js",
"js": "0x -D prof automerge-js.js"
},
"devDependencies": {
"0x": "^5.4.1"
}
}

View file

@ -0,0 +1,25 @@
use automerge::sync;
use automerge::{Automerge, AutomergeError};
use std::fs;
use std::time::Instant;
fn main() -> Result<(), AutomergeError> {
let contents = fs::read("badmessage").expect("cant read badmessage file");
let mut doc = Automerge::new();
let mut state = sync::State::new();
let now = Instant::now();
// decode and receive happen at the same time in wasm so lets keep it apples to apples
let message = sync::Message::decode(contents.as_slice()).expect("cant decode message");
doc.receive_sync_message(&mut state, message).unwrap();
println!("decode/receive in {} ms", now.elapsed().as_millis());
let now = Instant::now();
let saved = doc.save();
println!("save in {} ms", now.elapsed().as_millis());
let now = Instant::now();
let _ = Automerge::load(&saved).unwrap();
println!("load in {} ms", now.elapsed().as_millis());
let mut doc2 = Automerge::new();
doc2.load_incremental(saved.as_slice()).unwrap();
println!("load_incremental in {} ms", now.elapsed().as_millis());
Ok(())
}