From 4ccfdfa538d2fd7e9ee63e937435a7f6fc622c12 Mon Sep 17 00:00:00 2001
From: Wu Cheng-Han <jacky_cute0808@hotmail.com>
Date: Mon, 7 Nov 2016 21:30:40 +0800
Subject: [PATCH] Add workers for notes to leverage CPU intensive work loading

---
 lib/realtime.js                  | 161 ++++++++++++++-----------------
 lib/workers/noteRevisionSaver.js |  19 ++++
 lib/workers/noteUpdater.js       | 101 +++++++++++++++++++
 3 files changed, 191 insertions(+), 90 deletions(-)
 create mode 100644 lib/workers/noteRevisionSaver.js
 create mode 100644 lib/workers/noteUpdater.js

diff --git a/lib/realtime.js b/lib/realtime.js
index b50e05b29..5d769e7d9 100644
--- a/lib/realtime.js
+++ b/lib/realtime.js
@@ -9,6 +9,7 @@ var randomcolor = require("randomcolor");
 var Chance = require('chance'),
     chance = new Chance();
 var moment = require('moment');
+var childProcess = require('child_process');
 
 //core
 var config = require("./config.js");
@@ -19,6 +20,9 @@ var models = require("./models");
 //ot
 var ot = require("./ot/index.js");
 
+// workers
+var noteUpdater = require("./workers/noteUpdater");
+
 //public
 var realtime = {
     io: null,
@@ -79,97 +83,62 @@ function emitCheck(note) {
 var users = {};
 var notes = {};
 //update when the note is dirty
+var updaterIsBusy = false;
 var updater = setInterval(function () {
-    async.each(Object.keys(notes), function (key, callback) {
+    if (updaterIsBusy) return;
+    var _notes = {};
+    Object.keys(notes).forEach(function (key) {
         var note = notes[key];
-        if (note.server.isDirty) {
-            if (config.debug) logger.info("updater found dirty note: " + key);
-            updateNote(note, function(err, _note) {
-                // handle when note already been clean up
-                if (!notes[key] || !notes[key].server) return callback(null, null);
-                if (!_note) {
-                    realtime.io.to(note.id).emit('info', {
-                        code: 404
-                    });
-                    logger.error('note not found: ', note.id);
-                }
-                if (err || !_note) {
-                    for (var i = 0, l = note.socks.length; i < l; i++) {
-                        var sock = note.socks[i];
-                        if (typeof sock !== 'undefined' && sock) {
-                            setTimeout(function () {
-                                sock.disconnect(true);
-                            }, 0);
-                        }
+        if (!note.server || !note.server.isDirty) return;
+        _notes[key] = {
+            id: note.id,
+            lastchangeuser: note.lastchangeuser,
+            authorship: note.authorship,
+            document: note.server.document
+        };
+        note.server.isDirty = false;
+    });
+    if (Object.keys(_notes).length <= 0) return;
+    updaterIsBusy = true;
+    var worker = childProcess.fork("./lib/workers/noteUpdater.js");
+    if (config.debug) logger.info('note updater worker process started');
+    worker.send({
+        msg: 'update note',
+        notes: _notes
+    });
+    worker.on('message', function (data) {
+        if (!data || !data.msg || !data.note) return;
+        var note = notes[data.note.id];
+        if (!note) return;
+        switch(data.msg) {
+            case 'error':
+                for (var i = 0, l = note.socks.length; i < l; i++) {
+                    var sock = note.socks[i];
+                    if (typeof sock !== 'undefined' && sock) {
+                        setTimeout(function () {
+                            sock.disconnect(true);
+                        }, 0);
                     }
-                    return callback(err, null);
                 }
-                note.server.isDirty = false;
-                note.updatetime = moment(_note.lastchangeAt).valueOf();
+                break;
+            case 'note not found':
+                realtime.io.to(note.id).emit('info', {
+                    code: 404
+                });
+                break;
+            case 'check':
+                note.lastchangeuserprofile = data.note.lastchangeuserprofile;
+                note.updatetime = data.note.updatetime;
+                saverSleep = false;
                 emitCheck(note);
-                return callback(null, null);
-            });
-        } else {
-            return callback(null, null);
+                break;
         }
-    }, function (err) {
-        if (err) return logger.error('updater error', err);
+    });
+    worker.on('close', function (code) {
+        updaterIsBusy = false;
+        if (config.debug) logger.info('note updater worker process exited with code ' + code);
     });
 }, 1000);
-function updateNote(note, callback) {
-    models.Note.findOne({
-        where: {
-            id: note.id
-        }
-    }).then(function (_note) {
-        if (!_note) return callback(null, null);
-        if (note.lastchangeuser) {
-            if (_note.lastchangeuserId != note.lastchangeuser) {
-                models.User.findOne({
-                    where: {
-                        id: note.lastchangeuser
-                    }
-                }).then(function (user) {
-                    if (!user) return callback(null, null);
-                    note.lastchangeuserprofile = models.User.parseProfile(user.profile);
-                    return finishUpdateNote(note, _note, callback);
-                }).catch(function (err) {
-                    logger.error(err);
-                    return callback(err, null);
-                });
-            } else {
-                return finishUpdateNote(note, _note, callback);
-            }
-        } else {
-            note.lastchangeuserprofile = null;
-            return finishUpdateNote(note, _note, callback);
-        }
-    }).catch(function (err) {
-        logger.error(err);
-        return callback(err, null);
-    });
-}
-function finishUpdateNote(note, _note, callback) {
-    if (!note || !note.server) return callback(null, null);
-    var body = note.server.document;
-    var title = note.title = models.Note.parseNoteTitle(body);
-    title = LZString.compressToBase64(title);
-    body = LZString.compressToBase64(body);
-    var values = {
-        title: title,
-        content: body,
-        authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
-        lastchangeuserId: note.lastchangeuser,
-        lastchangeAt: Date.now()
-    };
-    _note.update(values).then(function (_note) {
-        saverSleep = false;
-        return callback(null, _note);
-    }).catch(function (err) {
-        logger.error(err);
-        return callback(err, null);
-    });
-}
 //clean when user not in any rooms or user not in connected list
 var cleaner = setInterval(function () {
     async.each(Object.keys(users), function (key, callback) {
@@ -192,16 +161,28 @@ var cleaner = setInterval(function () {
     });
 }, 60000);
 var saverSleep = false;
+var saverIsBusy = false;
 // save note revision in interval
 var saver = setInterval(function () {
-    if (saverSleep) return;
-    models.Revision.saveAllNotesRevision(function (err, notes) {
-        if (err) return logger.error('revision saver failed: ' + err);
-        if (notes && notes.length <= 0) {
-            saverSleep = true;
-            return;
+    if (saverSleep || saverIsBusy) return;
+    saverIsBusy = true;
+    var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js");
+    if (config.debug) logger.info('note revision saver worker process started');
+    worker.send({
+        msg: 'save note revision'
+    });
+    worker.on('message', function (data) {
+        if (!data || !data.msg) return;
+        switch(data.msg) {
+            case 'empty':
+                saverSleep = true;
+                break;
         }
     });
+    worker.on('close', function (code) {
+        saverIsBusy = false;
+        if (config.debug) logger.info('note revision saver worker process exited with code ' + code);
+    });
 }, 60000 * 5);
 
 function getStatus(callback) {
@@ -543,7 +524,7 @@ function disconnect(socket) {
         // remove note in notes if no user inside
         if (Object.keys(note.users).length <= 0) {
             if (note.server.isDirty) {
-                updateNote(note, function (err, _note) {
+                noteUpdater.updateNote(note, function (err, _note) {
                     if (err) return logger.error('disconnect note failed: ' + err);
                     // clear server before delete to avoid memory leaks
                     note.server.document = "";
diff --git a/lib/workers/noteRevisionSaver.js b/lib/workers/noteRevisionSaver.js
new file mode 100644
index 000000000..b6b117a31
--- /dev/null
+++ b/lib/workers/noteRevisionSaver.js
@@ -0,0 +1,19 @@
+// core
+var logger = require("../logger.js");
+var models = require("../models");
+
+process.on('message', function (data) {
+    if (!data || !data.msg || data.msg !== 'save note revision') return process.exit();
+    models.Revision.saveAllNotesRevision(function (err, notes) {
+        if (err) {
+            logger.error('note revision saver failed: ' + err);
+            return process.exit();
+        }
+        if (notes && notes.length <= 0) {
+            process.send({
+                msg: 'empty'
+            });
+        }
+        process.exit();
+    });
+});
\ No newline at end of file
diff --git a/lib/workers/noteUpdater.js b/lib/workers/noteUpdater.js
new file mode 100644
index 000000000..3fc4b1ebf
--- /dev/null
+++ b/lib/workers/noteUpdater.js
@@ -0,0 +1,101 @@
+// external modules
+var async = require('async');
+var moment = require('moment');
+var LZString = require('lz-string');
+
+// core
+var config = require("../config.js");
+var logger = require("../logger.js");
+var models = require("../models");
+
+process.on('message', function (data) {
+    if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit();
+    var notes = data.notes;
+    async.each(Object.keys(notes), function (key, callback) {
+        var note = notes[key];
+        if (config.debug) logger.info("note updater found dirty note: " + key);
+        updateNote(note, function(err, _note) {
+            if (!_note) {
+                process.send({
+                    msg: 'note not found',
+                    note: note
+                });
+                logger.error('note not found: ', note.id);
+            }
+            if (err || !_note) {
+                process.send({
+                    msg: 'error',
+                    note: note
+                });
+                return callback(err, null);
+            }
+            note.updatetime = moment(_note.lastchangeAt).valueOf();
+            process.send({
+                msg: 'check',
+                note: note
+            });
+            return callback(null, null);
+        });
+    }, function (err) {
+        if (err) logger.error('note updater error', err);
+        process.exit();
+    });
+});
+
+function updateNote(note, callback) {
+    models.Note.findOne({
+        where: {
+            id: note.id
+        }
+    }).then(function (_note) {
+        if (!_note) return callback(null, null);
+        if (note.lastchangeuser) {
+            if (_note.lastchangeuserId != note.lastchangeuser) {
+                models.User.findOne({
+                    where: {
+                        id: note.lastchangeuser
+                    }
+                }).then(function (user) {
+                    if (!user) return callback(null, null);
+                    note.lastchangeuserprofile = models.User.parseProfile(user.profile);
+                    return finishUpdateNote(note, _note, callback);
+                }).catch(function (err) {
+                    logger.error(err);
+                    return callback(err, null);
+                });
+            } else {
+                return finishUpdateNote(note, _note, callback);
+            }
+        } else {
+            note.lastchangeuserprofile = null;
+            return finishUpdateNote(note, _note, callback);
+        }
+    }).catch(function (err) {
+        logger.error(err);
+        return callback(err, null);
+    });
+}
+
+function finishUpdateNote(note, _note, callback) {
+    var body = note.document;
+    var title = note.title = models.Note.parseNoteTitle(body);
+    title = LZString.compressToBase64(title);
+    body = LZString.compressToBase64(body);
+    var values = {
+        title: title,
+        content: body,
+        authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
+        lastchangeuserId: note.lastchangeuser,
+        lastchangeAt: Date.now()
+    };
+    _note.update(values).then(function (_note) {
+        return callback(null, _note);
+    }).catch(function (err) {
+        logger.error(err);
+        return callback(err, null);
+    });
+}
+
+module.exports = {
+    updateNote: updateNote
+};
\ No newline at end of file