refactor: save ydoc state in the database, so it can be restored easier

By storing the ydoc state in the database we can reconnect lost clients easier
and enable offline editing because we continue using the crdt data that has been
used by the client before the connection loss.

Signed-off-by: Tilman Vatteroth <git@tilmanvatteroth.de>
This commit is contained in:
Tilman Vatteroth 2023-03-24 10:26:49 +01:00
parent 4707540237
commit a826677225
26 changed files with 301 additions and 204 deletions

View file

@ -6,14 +6,13 @@
import { RealtimeDoc } from '../y-doc-sync/realtime-doc.js'
import { ConnectionState, MessageTransporter } from './message-transporter.js'
import { Message, MessageType } from './message.js'
import { Doc, encodeStateAsUpdate } from 'yjs'
/**
* A mocked connection that doesn't send or receive any data and is instantly ready.
* The only exception is the note content state request that is answered with the given initial content.
*/
export class MockedBackendMessageTransporter extends MessageTransporter {
private readonly doc: Doc
private readonly doc: RealtimeDoc
private connected = true
@ -41,10 +40,10 @@ export class MockedBackendMessageTransporter extends MessageTransporter {
sendMessage<M extends MessageType>(content: Message<M>) {
if (content.type === MessageType.NOTE_CONTENT_STATE_REQUEST) {
setTimeout(() => {
const payload = Array.from(
encodeStateAsUpdate(this.doc, new Uint8Array(content.payload))
)
this.receiveMessage({ type: MessageType.NOTE_CONTENT_UPDATE, payload })
this.receiveMessage({
type: MessageType.NOTE_CONTENT_UPDATE,
payload: this.doc.encodeStateAsUpdate(content.payload)
})
}, 10)
}
}

View file

@ -6,11 +6,31 @@
import { RealtimeDoc } from './realtime-doc.js'
import { describe, expect, it } from '@jest/globals'
describe('websocket-doc', () => {
it('saves the initial content', () => {
describe('realtime doc', () => {
it('saves an initial text content correctly', () => {
const textContent = 'textContent'
const websocketDoc = new RealtimeDoc(textContent)
const realtimeDoc = new RealtimeDoc(textContent)
expect(realtimeDoc.getCurrentContent()).toBe(textContent)
})
expect(websocketDoc.getCurrentContent()).toBe(textContent)
it('will initialize an empty text if no initial content is given', () => {
const realtimeDoc = new RealtimeDoc()
expect(realtimeDoc.getCurrentContent()).toBe('')
})
it('restores a yjs state vector update correctly', () => {
const realtimeDoc = new RealtimeDoc(
'notTheVectorText',
[
1, 1, 221, 208, 165, 230, 3, 0, 4, 1, 15, 109, 97, 114, 107, 100, 111,
119, 110, 67, 111, 110, 116, 101, 110, 116, 32, 116, 101, 120, 116, 67,
111, 110, 116, 101, 110, 116, 70, 114, 111, 109, 83, 116, 97, 116, 101,
86, 101, 99, 116, 111, 114, 85, 112, 100, 97, 116, 101, 0
]
)
expect(realtimeDoc.getCurrentContent()).toBe(
'textContentFromStateVectorUpdate'
)
})
})

View file

@ -3,27 +3,52 @@
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Doc } from 'yjs'
import { Text as YText } from 'yjs'
import { EventEmitter2 } from 'eventemitter2'
import type { EventMap } from 'eventemitter2'
import {
applyUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
Text as YText
} from 'yjs'
const MARKDOWN_CONTENT_CHANNEL_NAME = 'markdownContent'
export interface RealtimeDocEvents extends EventMap {
update: (update: number[], origin: unknown) => void
}
/**
* This is the implementation of {@link Doc YDoc} which includes additional handlers for message sending and receiving.
*/
export class RealtimeDoc extends Doc {
export class RealtimeDoc extends EventEmitter2<RealtimeDocEvents> {
private doc: Doc = new Doc()
private readonly docUpdateListener: (
update: Uint8Array,
origin: unknown
) => void
/**
* Creates a new instance.
*
* The new instance is filled with the given initial content.
*
* @param initialContent - the initial content of the {@link Doc YDoc}
* @param initialTextContent the initial text content of the {@link Doc YDoc}
* @param initialYjsState the initial yjs state. If provided this will be used instead of the text content
*/
constructor(initialContent?: string) {
constructor(initialTextContent?: string, initialYjsState?: number[]) {
super()
if (initialContent) {
this.getMarkdownContentChannel().insert(0, initialContent)
if (initialYjsState) {
this.applyUpdate(initialYjsState, this)
} else if (initialTextContent) {
this.getMarkdownContentChannel().insert(0, initialTextContent)
}
this.docUpdateListener = (update, origin) => {
this.emit('update', Array.from(update), origin)
}
this.doc.on('update', this.docUpdateListener)
}
/**
@ -32,7 +57,7 @@ export class RealtimeDoc extends Doc {
* @return The markdown channel
*/
public getMarkdownContentChannel(): YText {
return this.getText(MARKDOWN_CONTENT_CHANNEL_NAME)
return this.doc.getText(MARKDOWN_CONTENT_CHANNEL_NAME)
}
/**
@ -45,4 +70,35 @@ export class RealtimeDoc extends Doc {
public getCurrentContent(): string {
return this.getMarkdownContentChannel().toString()
}
/**
* Encodes the current state of the doc as update so it can be applied to other y-docs.
*
* @param encodedTargetStateVector The current state vector of the other y-doc. If provided the update will contain only the differences.
*/
public encodeStateAsUpdate(encodedTargetStateVector?: number[]): number[] {
const update = encodedTargetStateVector
? new Uint8Array(encodedTargetStateVector)
: undefined
return Array.from(encodeStateAsUpdate(this.doc, update))
}
public destroy(): void {
this.doc.off('update', this.docUpdateListener)
this.doc.destroy()
}
/**
* Applies an update to the y-doc.
*
* @param payload The update to apply
* @param origin A reference that triggered the update
*/
public applyUpdate(payload: number[], origin: unknown): void {
applyUpdate(this.doc, new Uint8Array(payload), origin)
}
public encodeStateVector(): number[] {
return Array.from(encodeStateVector(this.doc))
}
}

View file

@ -68,10 +68,10 @@ describe('message transporter', () => {
console.debug('s>2 is connected')
)
docServer.on('update', (update: Uint8Array, origin: unknown) => {
docServer.on('update', (update: number[], origin: unknown) => {
const message: Message<MessageType.NOTE_CONTENT_UPDATE> = {
type: MessageType.NOTE_CONTENT_UPDATE,
payload: Array.from(update)
payload: update
}
if (origin !== transporterServerTo1) {
console.debug('YDoc on Server updated. Sending to Client 1')
@ -82,32 +82,35 @@ describe('message transporter', () => {
transporterServerTo2.sendMessage(message)
}
})
docClient1.on('update', (update: Uint8Array, origin: unknown) => {
docClient1.on('update', (update: number[], origin: unknown) => {
if (origin !== transporterClient1) {
console.debug('YDoc on client 1 updated. Sending to Server')
}
})
docClient2.on('update', (update: Uint8Array, origin: unknown) => {
docClient2.on('update', (update: number[], origin: unknown) => {
if (origin !== transporterClient2) {
console.debug('YDoc on client 2 updated. Sending to Server')
}
})
const yDocSyncAdapter1 = new YDocSyncClientAdapter(transporterClient1)
yDocSyncAdapter1.setYDoc(docClient1)
const yDocSyncAdapter2 = new YDocSyncClientAdapter(transporterClient2)
yDocSyncAdapter2.setYDoc(docClient2)
const yDocSyncAdapter1 = new YDocSyncClientAdapter(
transporterClient1,
docClient1
)
const yDocSyncAdapter2 = new YDocSyncClientAdapter(
transporterClient2,
docClient2
)
const yDocSyncAdapterServerTo1 = new YDocSyncServerAdapter(
transporterServerTo1
transporterServerTo1,
docServer
)
yDocSyncAdapterServerTo1.setYDoc(docServer)
const yDocSyncAdapterServerTo2 = new YDocSyncServerAdapter(
transporterServerTo2
transporterServerTo2,
docServer
)
yDocSyncAdapterServerTo2.setYDoc(docServer)
const waitForClient1Sync = new Promise<void>((resolve) => {
yDocSyncAdapter1.doAsSoonAsSynced(() => {

View file

@ -5,26 +5,37 @@
*/
import { MessageTransporter } from '../message-transporters/message-transporter.js'
import { Message, MessageType } from '../message-transporters/message.js'
import { RealtimeDoc } from './realtime-doc.js'
import { Listener } from 'eventemitter2'
import { EventEmitter2 } from 'eventemitter2'
import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'
type EventMap = Record<'synced' | 'desynced', () => void>
/**
* Sends and processes messages that are used to first-synchronize and update a {@link Doc y-doc}.
* Sends and processes messages that are used to first-synchronize and update a {@link RealtimeDoc y-doc}.
*/
export abstract class YDocSyncAdapter {
public readonly eventEmitter = new EventEmitter2<EventMap>()
protected doc: Doc | undefined
private destroyYDocUpdateCallback: undefined | (() => void)
private destroyEventListenerCallback: undefined | (() => void)
private readonly yDocUpdateListener: Listener
private readonly destroyEventListenerCallback: undefined | (() => void)
private synced = false
constructor(protected readonly messageTransporter: MessageTransporter) {
this.bindDocumentSyncMessageEvents()
constructor(
protected readonly messageTransporter: MessageTransporter,
protected readonly doc: RealtimeDoc
) {
this.yDocUpdateListener = doc.on(
'update',
(update, origin) => {
this.processDocUpdate(update, origin)
},
{
objectify: true
}
) as Listener
this.destroyEventListenerCallback = this.bindDocumentSyncMessageEvents()
}
/**
@ -51,39 +62,19 @@ export abstract class YDocSyncAdapter {
return this.synced
}
/**
* Sets the {@link Doc y-doc} that should be synchronized.
*
* @param doc the doc to synchronize.
*/
public setYDoc(doc: Doc | undefined): void {
this.doc = doc
this.destroyYDocUpdateCallback?.()
if (!doc) {
return
}
const yDocUpdateCallback = this.processDocUpdate.bind(this)
doc.on('update', yDocUpdateCallback)
this.destroyYDocUpdateCallback = () => doc.off('update', yDocUpdateCallback)
this.eventEmitter.emit('desynced')
}
public destroy(): void {
this.destroyYDocUpdateCallback?.()
this.yDocUpdateListener.off()
this.destroyEventListenerCallback?.()
}
protected bindDocumentSyncMessageEvents(): void {
protected bindDocumentSyncMessageEvents(): () => void {
const stateRequestListener = this.messageTransporter.on(
MessageType.NOTE_CONTENT_STATE_REQUEST,
(payload) => {
if (this.doc) {
this.messageTransporter.sendMessage({
type: MessageType.NOTE_CONTENT_UPDATE,
payload: Array.from(
encodeStateAsUpdate(this.doc, new Uint8Array(payload.payload))
)
payload: this.doc.encodeStateAsUpdate(payload.payload)
})
}
},
@ -95,35 +86,30 @@ export abstract class YDocSyncAdapter {
() => {
this.synced = false
this.eventEmitter.emit('desynced')
this.destroy()
},
{ objectify: true }
) as Listener
const noteContentUpdateListener = this.messageTransporter.on(
MessageType.NOTE_CONTENT_UPDATE,
(payload) => {
if (this.doc) {
applyUpdate(this.doc, new Uint8Array(payload.payload), this)
}
},
(payload) => this.doc.applyUpdate(payload.payload, this),
{ objectify: true }
) as Listener
this.destroyEventListenerCallback = () => {
return () => {
stateRequestListener.off()
disconnectedListener.off()
noteContentUpdateListener.off()
}
}
private processDocUpdate(update: Uint8Array, origin: unknown): void {
private processDocUpdate(update: number[], origin: unknown): void {
if (!this.isSynced() || origin === this) {
return
}
const message: Message<MessageType.NOTE_CONTENT_UPDATE> = {
type: MessageType.NOTE_CONTENT_UPDATE,
payload: Array.from(update)
payload: update
}
this.messageTransporter.sendMessage(message)
@ -141,7 +127,7 @@ export abstract class YDocSyncAdapter {
if (this.doc) {
this.messageTransporter.sendMessage({
type: MessageType.NOTE_CONTENT_STATE_REQUEST,
payload: Array.from(encodeStateVector(this.doc))
payload: this.doc.encodeStateVector()
})
}
}

View file

@ -5,13 +5,23 @@
*/
import { MessageType } from '../message-transporters/message.js'
import { YDocSyncAdapter } from './y-doc-sync-adapter.js'
import { Listener } from 'eventemitter2'
export class YDocSyncClientAdapter extends YDocSyncAdapter {
protected bindDocumentSyncMessageEvents() {
super.bindDocumentSyncMessageEvents()
const destroyCallback = super.bindDocumentSyncMessageEvents()
this.messageTransporter.on(MessageType.NOTE_CONTENT_UPDATE, () => {
this.markAsSynced()
})
const noteContentUpdateListener = this.messageTransporter.on(
MessageType.NOTE_CONTENT_UPDATE,
() => {
this.markAsSynced()
},
{ objectify: true }
) as Listener
return () => {
destroyCallback()
noteContentUpdateListener.off()
}
}
}

View file

@ -4,11 +4,15 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { MessageTransporter } from '../message-transporters/message-transporter.js'
import { RealtimeDoc } from './realtime-doc.js'
import { YDocSyncAdapter } from './y-doc-sync-adapter.js'
export class YDocSyncServerAdapter extends YDocSyncAdapter {
constructor(readonly messageTransporter: MessageTransporter) {
super(messageTransporter)
constructor(
readonly messageTransporter: MessageTransporter,
readonly doc: RealtimeDoc
) {
super(messageTransporter, doc)
this.markAsSynced()
}
}