Skip to content

Commit 609338c

Browse files
committed
Define server plugin interface, move Agent queries code into a built-in plugin
1 parent 766f727 commit 609338c

File tree

3 files changed

+338
-151
lines changed

3 files changed

+338
-151
lines changed

lib/agent.js

Lines changed: 46 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ var ShareDBError = require('./error');
66

77
var ERROR_CODE = ShareDBError.CODES;
88

9+
/** @typedef {import('./backend')} Backend */
10+
/** @typedef {import('./backend').ServerPlugin<unknown, unknown, unknown>} ServerPlugin */
11+
912
/**
1013
* Agent deserializes the wire protocol messages received from the stream and
1114
* calls the corresponding functions on its Agent. It uses the return values
@@ -16,6 +19,7 @@ var ERROR_CODE = ShareDBError.CODES;
1619
* @param {Duplex} stream connection to a client
1720
*/
1821
function Agent(backend, stream) {
22+
/** @type {Backend} */
1923
this.backend = backend;
2024
this.stream = stream;
2125

@@ -32,8 +36,24 @@ function Agent(backend, stream) {
3236
// map of collection -> id -> stream
3337
this.subscribedDocs = {};
3438

35-
// Map from queryId -> emitter
36-
this.subscribedQueries = {};
39+
/**
40+
* Map of action name (`a` field in requests) to plugin
41+
* @type {{ [action: string]: ServerPlugin }}
42+
*/
43+
var actionToPlugin = this.actionToPlugin = {};
44+
// Map of plugin name -> plugin's agent state
45+
this.pluginStates = {};
46+
for (var i = 0; i < backend.plugins.length; i++) {
47+
var plugin = backend.plugins[i];
48+
for (var action in plugin.requestHandlers) {
49+
if (actionToPlugin[action]) {
50+
throw new Error('Action ' + action + ' in plugin ' + plugin.name +
51+
' conflicts with plugin ' + actionToPlugin[action].name);
52+
}
53+
actionToPlugin[action] = plugin;
54+
}
55+
this.pluginStates[plugin.name] = plugin.createAgentState();
56+
}
3757

3858
// Track which documents are subscribed to presence by the client. This is a
3959
// map of channel -> stream
@@ -167,43 +187,6 @@ Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
167187
});
168188
};
169189

170-
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
171-
var previous = this.subscribedQueries[queryId];
172-
if (previous) previous.destroy();
173-
this.subscribedQueries[queryId] = emitter;
174-
175-
var agent = this;
176-
emitter.onExtra = function(extra) {
177-
agent.send({a: 'q', id: queryId, extra: extra});
178-
};
179-
180-
emitter.onDiff = function(diff) {
181-
for (var i = 0; i < diff.length; i++) {
182-
var item = diff[i];
183-
if (item.type === 'insert') {
184-
item.values = getResultsData(item.values);
185-
}
186-
}
187-
// Consider stripping the collection out of the data we send here
188-
// if it matches the query's collection.
189-
agent.send({a: 'q', id: queryId, diff: diff});
190-
};
191-
192-
emitter.onError = function(err) {
193-
// Log then silently ignore errors in a subscription stream, since these
194-
// may not be the client's fault, and they were not the result of a
195-
// direct request by the client
196-
logger.error('Query subscription stream error', collection, query, err);
197-
};
198-
199-
emitter.onOp = function(op) {
200-
var id = op.d;
201-
agent._onOp(collection, id, op);
202-
};
203-
204-
emitter._open();
205-
};
206-
207190
Agent.prototype._onOp = function(collection, id, op) {
208191
if (this._isOwnOp(collection, op)) return;
209192

@@ -379,19 +362,22 @@ Agent.prototype._checkRequest = function(request) {
379362
// Handle an incoming message from the client
380363
Agent.prototype._handleMessage = function(request, callback) {
381364
try {
365+
var plugin = this.actionToPlugin[request.a];
366+
382367
var errMessage = this._checkRequest(request);
383368
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));
369+
if (plugin) {
370+
try {
371+
plugin.checkRequest(request);
372+
} catch (err) {
373+
return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, err.message));
374+
}
375+
}
384376

385377
switch (request.a) {
386378
case 'hs':
387379
if (request.id) this.src = request.id;
388380
return callback(null, this._initMessage('hs'));
389-
case 'qf':
390-
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
391-
case 'qs':
392-
return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback);
393-
case 'qu':
394-
return this._queryUnsubscribe(request.id, callback);
395381
case 'bf':
396382
return this._fetchBulk(request.c, request.b, callback);
397383
case 'bs':
@@ -435,109 +421,23 @@ Agent.prototype._handleMessage = function(request, callback) {
435421
case 'pu':
436422
return this._unsubscribePresence(request.ch, request.seq, callback);
437423
default:
438-
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
439-
}
440-
} catch (err) {
441-
callback(err);
442-
}
443-
};
444-
function getQueryOptions(request) {
445-
var results = request.r;
446-
var ids;
447-
var fetch;
448-
var fetchOps;
449-
if (results) {
450-
ids = [];
451-
for (var i = 0; i < results.length; i++) {
452-
var result = results[i];
453-
var id = result[0];
454-
var version = result[1];
455-
ids.push(id);
456-
if (version == null) {
457-
if (fetch) {
458-
fetch.push(id);
424+
if (plugin) {
425+
return plugin.requestHandlers[request.a](request, {
426+
agent: this,
427+
backend: this.backend,
428+
agentState: this.pluginStates[plugin.name]
429+
}, callback);
459430
} else {
460-
fetch = [id];
431+
callback(
432+
new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')
433+
);
461434
}
462-
} else {
463-
if (!fetchOps) fetchOps = {};
464-
fetchOps[id] = version;
465-
}
466435
}
436+
} catch (err) {
437+
callback(err);
467438
}
468-
var options = request.o || {};
469-
options.ids = ids;
470-
options.fetch = fetch;
471-
options.fetchOps = fetchOps;
472-
return options;
473-
}
474-
475-
Agent.prototype._queryFetch = function(queryId, collection, query, options, callback) {
476-
// Fetch the results of a query once
477-
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) {
478-
if (err) return callback(err);
479-
var message = {
480-
data: getResultsData(results),
481-
extra: extra
482-
};
483-
callback(null, message);
484-
});
485-
};
486-
487-
Agent.prototype._querySubscribe = function(queryId, collection, query, options, callback) {
488-
// Subscribe to a query. The client is sent the query results and its
489-
// notified whenever there's a change
490-
var agent = this;
491-
var wait = 1;
492-
var message;
493-
function finish(err) {
494-
if (err) return callback(err);
495-
if (--wait) return;
496-
callback(null, message);
497-
}
498-
if (options.fetch) {
499-
wait++;
500-
this.backend.fetchBulk(this, collection, options.fetch, function(err, snapshotMap) {
501-
if (err) return finish(err);
502-
message = getMapResult(snapshotMap);
503-
finish();
504-
});
505-
}
506-
if (options.fetchOps) {
507-
wait++;
508-
this._fetchBulkOps(collection, options.fetchOps, finish);
509-
}
510-
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
511-
if (err) return finish(err);
512-
if (this.closed) return emitter.destroy();
513-
514-
agent._subscribeToQuery(emitter, queryId, collection, query);
515-
// No results are returned when ids are passed in as an option. Instead,
516-
// want to re-poll the entire query once we've established listeners to
517-
// emit any diff in results
518-
if (!results) {
519-
emitter.queryPoll(finish);
520-
return;
521-
}
522-
message = {
523-
data: getResultsData(results),
524-
extra: extra
525-
};
526-
finish();
527-
});
528439
};
529440

530-
function getResultsData(results) {
531-
var items = [];
532-
for (var i = 0; i < results.length; i++) {
533-
var result = results[i];
534-
var item = getSnapshotData(result);
535-
item.d = result.id;
536-
items.push(item);
537-
}
538-
return items;
539-
}
540-
541441
function getMapResult(snapshotMap) {
542442
var data = {};
543443
for (var id in snapshotMap) {
@@ -553,6 +453,8 @@ function getMapResult(snapshotMap) {
553453
}
554454
return {data: data};
555455
}
456+
// Exported for use in core plugins
457+
Agent._getMapResult = getMapResult;
556458

557459
function getSnapshotData(snapshot) {
558460
var data = {
@@ -564,15 +466,8 @@ function getSnapshotData(snapshot) {
564466
}
565467
return data;
566468
}
567-
568-
Agent.prototype._queryUnsubscribe = function(queryId, callback) {
569-
var emitter = this.subscribedQueries[queryId];
570-
if (emitter) {
571-
emitter.destroy();
572-
delete this.subscribedQueries[queryId];
573-
}
574-
process.nextTick(callback);
575-
};
469+
// Exported for use in core plugins
470+
Agent._getSnapshotData = getSnapshotData;
576471

577472
Agent.prototype._fetch = function(collection, id, version, callback) {
578473
if (version == null) {

lib/backend.js

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ var MemoryPubSub = require('./pubsub/memory');
88
var ot = require('./ot');
99
var projections = require('./projections');
1010
var QueryEmitter = require('./query-emitter');
11+
var QueryServerPlugin = require('./query-server-plugin').Plugin;
1112
var ShareDBError = require('./error');
1213
var Snapshot = require('./snapshot');
1314
var StreamSocket = require('./stream-socket');
@@ -37,6 +38,10 @@ function Backend(options) {
3738
// Map from event name to a list of middleware
3839
this.middleware = {};
3940

41+
/** @type {Array<ServerPlugin<unknown, unknown, unknown>>} */
42+
this.plugins = [];
43+
this._registerServerPlugin(new QueryServerPlugin());
44+
4045
// The number of open agents for monitoring and testing memory leaks
4146
this.agentsCount = 0;
4247
this.remoteAgentsCount = 0;
@@ -82,6 +87,64 @@ Backend.prototype.SNAPSHOT_TYPES = {
8287
byTimestamp: 'byTimestamp'
8388
};
8489

90+
/**
91+
* @param {ServerPlugin<unknown, unknown, unknown>} plugin
92+
*/
93+
Backend.prototype._registerServerPlugin = function(plugin) {
94+
this.plugins.push(plugin);
95+
};
96+
97+
/**
98+
* @typedef {object} ServerPlugin
99+
*
100+
* @property {string} name Unique plugin name, usually based on the plugin's package name
101+
* @property {{[action: string]: RequestHandler<TReq>}} requestHandlers
102+
* @property {(callback: (error?: Error) => void) => void} close Function called when
103+
* `Backend#close` is called. The close function should shut down database connections and then
104+
* call the callback.
105+
* @property {() => S} createAgentState Function to create an object that contains custom
106+
* per-agent state that the plugin needs
107+
* @property {(agentState: S) => void} destroyAgentState Function to tear down the plugin's state
108+
* for an agent
109+
* @property {(request: unknown) => request is TReq} checkRequest Function to check the format of
110+
* an incoming message from a client, throwing an error if the message is invalid
111+
*
112+
* @template TReq - Type for request data from a client
113+
* @template TResp - Type for response data sent back to a client
114+
* @template S - Type for custom per-agent state that the plugin needs to keep
115+
*/
116+
117+
/**
118+
* @callback RequestHandler
119+
*
120+
* Function that handles an incoming message from a client
121+
*
122+
* @param {TReq} request Request message from a client
123+
* @param {RequestHandlerContext<S>} context
124+
* @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the
125+
* reply message
126+
* @returns {void}
127+
*
128+
* @template TReq - Type for request data from a client
129+
* @template TResp - Type for response data sent back to a client
130+
* @template S - Type for custom per-agent state that the plugin needs to keep
131+
*/
132+
133+
/**
134+
* @typedef {object} RequestHandlerContext
135+
*
136+
* @property {import('./agent')} agent
137+
* @property {import('./backend')} backend
138+
* @property {S} agentState
139+
*
140+
* @template S
141+
*/
142+
143+
/**
144+
* Closes the backend and all its database connections.
145+
*
146+
* @param {(error?: Error) => void} callback
147+
*/
85148
Backend.prototype.close = function(callback) {
86149
var wait = 4;
87150
var backend = this;

0 commit comments

Comments
 (0)