refactor: reimplement realtime-communication

This commit refactors a lot of things that are not easy to separate.
It replaces the binary protocol of y-protocols with json.
It introduces event based message processing.
It implements our own code mirror plugins for synchronisation of content and remote cursors

Signed-off-by: Tilman Vatteroth <git@tilmanvatteroth.de>
This commit is contained in:
Tilman Vatteroth 2023-03-22 20:21:40 +01:00
parent 67cf1432b2
commit 3a06f84af1
110 changed files with 3920 additions and 2201 deletions

View file

@ -0,0 +1,102 @@
/*
* SPDX-FileCopyrightText: 2022 The HedgeDoc developers (see AUTHORS file)
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Message, MessagePayloads, MessageType } from './message.js'
import { EventEmitter2, Listener } from 'eventemitter2'
export type MessageEvents = MessageType | 'connected' | 'disconnected'
type MessageEventPayloadMap = {
[E in MessageEvents]: E extends keyof MessagePayloads
? (message: Message<E>) => void
: () => void
}
export enum ConnectionState {
DISCONNECT,
CONNECTING,
CONNECTED
}
/**
* Base class for event based message communication.
*/
export abstract class MessageTransporter extends EventEmitter2<MessageEventPayloadMap> {
private readyMessageReceived = false
public abstract sendMessage<M extends MessageType>(content: Message<M>): void
protected receiveMessage<L extends MessageType>(message: Message<L>): void {
if (message.type === MessageType.READY) {
this.readyMessageReceived = true
}
this.emit(message.type, message)
}
public sendReady(): void {
this.sendMessage({
type: MessageType.READY
})
}
public abstract disconnect(): void
public abstract getConnectionState(): ConnectionState
protected onConnected(): void {
this.emit('connected')
}
protected onDisconnecting(): void {
this.readyMessageReceived = false
this.emit('disconnected')
}
/**
* Indicates if the message transporter is connected and can send/receive messages.
*/
public isConnected(): boolean {
return this.getConnectionState() === ConnectionState.CONNECTED
}
/**
* Indicates if the message transporter has receives a {@link MessageType.READY ready message} yet.
*/
public isReady(): boolean {
return this.readyMessageReceived
}
/**
* Executes the given callback whenever the message transporter receives a ready message.
* If the messenger has already received a ready message then the callback will be executed immediately.
*
* @param callback The callback to execute when ready
* @return The event listener that waits for ready messages
*/
public doAsSoonAsReady(callback: () => void): Listener {
if (this.readyMessageReceived) {
callback()
}
return this.on(MessageType.READY, callback, {
objectify: true
}) as Listener
}
/**
* Executes the given callback whenever the message transporter has established a connection.
* If the messenger is already connected then the callback will be executed immediately.
*
* @param callback The callback to execute when connected
* @return The event listener that waits for connection events
*/
public doAsSoonAsConnected(callback: () => void): Listener {
if (this.isConnected()) {
callback()
}
return this.on('connected', callback, {
objectify: true
}) as Listener
}
}

View file

@ -0,0 +1,36 @@
/*
* SPDX-FileCopyrightText: 2022 The HedgeDoc developers (see AUTHORS file)
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { RealtimeUser, RemoteCursor } from './realtime-user.js'
export enum MessageType {
NOTE_CONTENT_STATE_REQUEST = 'NOTE_CONTENT_STATE_REQUEST',
NOTE_CONTENT_UPDATE = 'NOTE_CONTENT_UPDATE',
PING = 'PING',
PONG = 'PONG',
METADATA_UPDATED = 'METADATA_UPDATED',
DOCUMENT_DELETED = 'DOCUMENT_DELETED',
SERVER_VERSION_UPDATED = 'SERVER_VERSION_UPDATED',
REALTIME_USER_STATE_SET = 'REALTIME_USER_STATE_SET',
REALTIME_USER_SINGLE_UPDATE = 'REALTIME_USER_SINGLE_UPDATE',
REALTIME_USER_STATE_REQUEST = 'REALTIME_USER_STATE_REQUEST',
READY = 'READY'
}
export interface MessagePayloads {
[MessageType.NOTE_CONTENT_STATE_REQUEST]: number[]
[MessageType.NOTE_CONTENT_UPDATE]: number[]
[MessageType.REALTIME_USER_STATE_SET]: RealtimeUser[]
[MessageType.REALTIME_USER_SINGLE_UPDATE]: RemoteCursor
}
export type Message<T extends MessageType> = T extends keyof MessagePayloads
? {
type: T
payload: MessagePayloads[T]
}
: {
type: T
}

View file

@ -0,0 +1,57 @@
/*
* SPDX-FileCopyrightText: 2023 The HedgeDoc developers (see AUTHORS file)
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { RealtimeDoc } from '../y-doc-sync/realtime-doc.js'
import { ConnectionState, MessageTransporter } from './message-transporter.js'
import { Message, MessageType } from './message.js'
import { Doc, encodeStateAsUpdate } from 'yjs'
/**
* A mocked connection that doesn't send or receive any data and is instantly ready.
* The only exception is the note content state request that is answered with the given initial content.
*/
export class MockedBackendMessageTransporter extends MessageTransporter {
private readonly doc: Doc
private connected = true
constructor(initialContent: string) {
super()
this.doc = new RealtimeDoc(initialContent)
this.onConnected()
}
disconnect(): void {
if (!this.connected) {
return
}
this.connected = false
this.onDisconnecting()
}
sendReady() {
this.receiveMessage({
type: MessageType.READY
})
}
sendMessage<M extends MessageType>(content: Message<M>) {
if (content.type === MessageType.NOTE_CONTENT_STATE_REQUEST) {
setTimeout(() => {
const payload = Array.from(
encodeStateAsUpdate(this.doc, new Uint8Array(content.payload))
)
this.receiveMessage({ type: MessageType.NOTE_CONTENT_UPDATE, payload })
}, 10)
}
}
getConnectionState(): ConnectionState {
return this.connected
? ConnectionState.CONNECTED
: ConnectionState.DISCONNECT
}
}

View file

@ -0,0 +1,18 @@
/*
* SPDX-FileCopyrightText: 2023 The HedgeDoc developers (see AUTHORS file)
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
export interface RealtimeUser {
displayName: string
username: string | null
active: boolean
styleIndex: number
cursor: RemoteCursor
}
export interface RemoteCursor {
from: number
to?: number
}

View file

@ -0,0 +1,97 @@
/*
* SPDX-FileCopyrightText: 2022 The HedgeDoc developers (see AUTHORS file)
*
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { ConnectionState, MessageTransporter } from './message-transporter.js'
import { Message, MessageType } from './message.js'
import WebSocket, { CloseEvent, ErrorEvent, MessageEvent } from 'isomorphic-ws'
export class WebsocketTransporter extends MessageTransporter {
private websocket: WebSocket | undefined
private messageCallback: undefined | ((event: MessageEvent) => void)
private errorCallback: undefined | ((event: ErrorEvent) => void)
private closeCallback: undefined | ((event: CloseEvent) => void)
constructor() {
super()
}
public setWebsocket(websocket: WebSocket) {
if (
websocket.readyState === WebSocket.CLOSED ||
websocket.readyState === WebSocket.CLOSING
) {
throw new Error('Websocket must be open')
}
this.undbindEventsFromPreviousWebsocket()
this.websocket = websocket
this.bindWebsocketEvents(websocket)
if (this.websocket.readyState === WebSocket.OPEN) {
this.onConnected()
} else {
this.websocket.addEventListener('open', this.onConnected.bind(this))
}
}
private undbindEventsFromPreviousWebsocket() {
if (this.websocket) {
if (this.messageCallback) {
this.websocket.removeEventListener('message', this.messageCallback)
}
if (this.errorCallback) {
this.websocket.removeEventListener('error', this.errorCallback)
}
if (this.closeCallback) {
this.websocket.removeEventListener('close', this.closeCallback)
}
}
}
private bindWebsocketEvents(websocket: WebSocket) {
this.messageCallback = this.processMessageEvent.bind(this)
this.errorCallback = this.disconnect.bind(this)
this.closeCallback = this.onDisconnecting.bind(this)
websocket.addEventListener('message', this.messageCallback)
websocket.addEventListener('error', this.errorCallback)
websocket.addEventListener('close', this.closeCallback)
}
private processMessageEvent(event: MessageEvent): void {
if (typeof event.data !== 'string') {
return
}
const message = JSON.parse(event.data) as Message<MessageType>
this.receiveMessage(message)
}
public disconnect(): void {
this.websocket?.close()
}
public sendMessage(content: Message<MessageType>): void {
if (this.websocket?.readyState !== WebSocket.OPEN) {
throw new Error("Can't send message over non-open socket")
}
try {
this.websocket.send(JSON.stringify(content))
} catch (error: unknown) {
this.disconnect()
throw error
}
}
public getConnectionState(): ConnectionState {
if (this.websocket?.readyState === WebSocket.OPEN) {
return ConnectionState.CONNECTED
} else if (this.websocket?.readyState === WebSocket.CONNECTING) {
return ConnectionState.CONNECTING
} else {
return ConnectionState.DISCONNECT
}
}
}