diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8e603fc..48b3030 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - node: [14.x, 16.x, 17.x] + node: [16.x, 18.x] name: Node ${{ matrix.node }} steps: - uses: actions/checkout@v1 diff --git a/.gitignore b/.gitignore index 6704566..01c7ee1 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,6 @@ dist # TernJS port file .tern-port + +# Not necessary for libs +package-lock.json \ No newline at end of file diff --git a/index.js b/index.js index ccacec3..2514d51 100644 --- a/index.js +++ b/index.js @@ -1 +1,163 @@ 'use strict' +const fp = require('fastify-plugin') + +const { Errors } = require('./lib/index') + +module.exports = fp( + function fastifyRacePlugin (fastify, globalOpts, next) { + const controllers = new WeakMap() + let error + + if (globalOpts != null && typeof globalOpts !== 'object') { + return next(new Errors.BAD_PARAMS('object', typeof globalOpts)) + } + + globalOpts = Object.assign( + {}, + { handleOnError: true, onRequestClosed: null }, + globalOpts + ) + + if (typeof globalOpts.handleOnError !== 'boolean') { + error = new Errors.BAD_PARAMS('boolean', typeof globalOpts.handleOnError) + } else if ( + globalOpts.onRequestClosed != null && + typeof globalOpts.onRequestClosed !== 'function' + ) { + error = new Errors.BAD_PARAMS( + 'function', + typeof globalOpts.onRequestClosed + ) + } + + fastify.decorateRequest('race', race) + fastify.addHook('onResponse', fastifyRacingCleaner) + + return next(error) + + function fastifyRacingCleaner (request, _reply, done) { + if (controllers.has(request)) { + const { controller, cbs } = controllers.get(request) + + if (controller.signal.aborted === false) { + for (const cb of cbs) { + controller.signal.removeEventListener('abort', cb, { + once: true + }) + } + } + + controllers.delete(request) + } + + done() + } + + function race (opts = globalOpts) { + const { raw, id: reqId } = this + const handleError = typeof opts === 'function' ? true : opts.handleOnError + const cb = typeof opts === 'function' ? opts : opts.onRequestClosed + + if (controllers.has(this)) { + const { controller: ctrl, cbs } = controllers.get(this) + + if (ctrl.signal.aborted === true) { + throw new Errors.ALREADY_ABORTED(reqId) + } + + if (raw.socket.destroyed === true) { + throw new Errors.SOCKET_CLOSED(reqId) + } + + if (cb != null) { + ctrl.signal.addEventListener('abort', cb, { + once: true + }) + + controllers.set(this, { controller: ctrl, cbs: cbs.concat(cb) }) + } + + return ctrl.signal + } else { + // eslint-disable-next-line no-undef + const controller = new AbortController() + + if (cb != null) { + controller.signal.addEventListener('abort', cb, { + once: true + }) + } + + if (cb == null) controller.signal.then = theneable.bind(this) + + if (raw.socket.destroyed) { + throw new Errors.SOCKET_CLOSED(reqId) + } else { + raw.once( + 'close', + function () { + if (controllers.has(this)) { + const { controller: ctrl } = controllers.get(this) + if (ctrl.signal.aborted === false) controller.abort() + } + }.bind(this) + ) + + if (handleError === true || cb != null) { + raw.once( + 'error', + function (err) { + if (controllers.has(this)) { + const { controller: ctrl } = controllers.get(this) + if (ctrl.signal.aborted === false) controller.abort(err) + } + }.bind(this) + ) + } + } + + controllers.set(this, { controller, cbs: cb != null ? [cb] : [] }) + + return controller.signal + } + + function theneable (resolve, reject) { + const { controller, cbs } = controllers.get(this) + + if (raw.socket.destroyed === true) { + return reject(Errors.SOCKET_CLOSED(this.id)) + } + + if (controller.signal.aborted === true) { + return reject(Errors.ALREADY_ABORTED(this.id)) + } + + try { + controller.signal.addEventListener('abort', theneableHandler, { + once: true + }) + + controllers.set(this, { + controller, + cbs: cbs.concat(theneableHandler) + }) + } catch (err) { + reject(err) + } + + function theneableHandler (evt) { + const event = { + type: evt.type, + reason: controller.signal?.reason + } + + resolve(event) + } + } + } + }, + { + fastify: '>=3.24.1', + name: 'fastify-racing' + } +) diff --git a/lib/index.js b/lib/index.js index e69de29..318c41d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -0,0 +1,18 @@ +const Errors = require('@fastify/error') + +module.exports = { + Errors: { + BAD_PARAMS: Errors( + 'FST_PLUGIN_RACE_BAD_PARAM', + 'Invalid param, expected %s but received %s' + ), + ALREADY_ABORTED: Errors( + 'FST_PLUGIN_RACE_ALREADY_ABORTED', + "Request with ID '%s' already aborted" + ), + SOCKET_CLOSED: Errors( + 'FST_PLUGIN_RACE_SOCKET_CLOSED', + "Socket for request with ID '%s' already closed" + ) + } +} diff --git a/package.json b/package.json index 29ad2ab..fc9ec7e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { - "name": "", + "name": "fastify-racing", "version": "0.0.0", - "description": "", + "description": "Cancel any running operation at the right time on your request handler", "main": "index.js", "types": "index.d.ts", "scripts": { @@ -13,27 +13,35 @@ "lint:ci": "standard", "typescript": "tsd" }, + "engines": { + "node": ">=16.0.0" + }, "keywords": [], "repository": { "type": "git", - "url": "git+https://github.com/metcoder95/.git" + "url": "git+https://github.com/metcoder95/fastify-racing.git" }, - "readme": "https://github.com/metcoder95//blob/main/README.md", + "readme": "https://github.com/metcoder95/fastify-racing/blob/main/README.md", "bugs": { - "url": "https://github.com/metcoder95//issues" + "url": "https://github.com/metcoder95/fastify-racing/issues" }, "author": "metcoder95 ", "license": "MIT", "devDependencies": { - "@types/node": "^14.17.6", + "@types/node": "^14.18.13", + "fastify": "^3.28.0", "husky": "^7.0.2", + "nodemon": "^2.0.15", "snazzy": "^9.0.0", "standard": "^16.0.3", "tap": "^15.0.10", "tsd": "^0.17.0", - "typescript": "^4.4" + "typescript": "^4.4", + "undici": "^5.0.0" }, "dependencies": { + "@fastify/error": "^2.0.0", + "fastify-plugin": "^3.0.1" }, "tsd": { "directory": "test" diff --git a/test/index.test-d.ts b/test/index.test-d.ts index e69de29..69e5d9d 100644 --- a/test/index.test-d.ts +++ b/test/index.test-d.ts @@ -0,0 +1,26 @@ +/// +import { FastifyPluginCallback } from 'fastify'; +import { FastifyError } from '@fastify/error'; + + +interface AbortEvent { + type: 'abort' | string; + reason?: FastifyError | Error +} + +interface FastifyRacing { + handleError?: boolean; + onRequestClosed?: (evt: AbortEvent) => void; +} + +declare module 'fastify' { + interface FastifyInstance { + race(cb: FastifyRacing['onRequestClosed']): void + race(opts: FastifyRacing): Promise + race(): Promise + } +} + +declare const FastifyRacing: FastifyPluginCallback; + +export default FastifyRacing; \ No newline at end of file diff --git a/test/index.test.js b/test/index.test.js index ccacec3..e50df5e 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1 +1,324 @@ 'use strict' +const { promisify } = require('util') + +const tap = require('tap') +const fastify = require('fastify') +const { request } = require('undici') + +const plugin = require('../.') +const { Errors } = require('../lib') + +const sleep = promisify(setTimeout) + +tap.plan(2) + +tap.test('fastify-racing#decoration', subtest => { + subtest.plan(4) + + subtest.test('Should decorate the request properly', async t => { + t.plan(3) + + const app = fastify() + app.register(plugin) + + app.get('/', (req, reply) => { + t.ok(req.race, 'should decorate request object') + t.equal(typeof req.race, 'function', 'should be a function') + + return 'hello' + }) + + const response = await app.inject({ + method: 'GET', + path: '/' + }) + + t.equal(response.body, 'hello') + }) + + subtest.test('Should throw if invalid Global opts', async t => { + t.plan(3) + + const app = fastify() + try { + await app.register(plugin, 'invalid').ready() + } catch (err) { + t.ok(err, 'should throw') + t.equal(err.code, 'FST_PLUGIN_RACE_BAD_PARAM') + t.equal(err.message, 'Invalid param, expected object but received string') + } + }) + + subtest.test('Should throw if invalid Global opts.handleOnError', async t => { + t.plan(3) + + const app = fastify() + try { + await app.register(plugin, { handleOnError: 'invalid' }).ready() + } catch (err) { + t.ok(err, 'should throw') + t.equal(err.code, 'FST_PLUGIN_RACE_BAD_PARAM') + t.equal( + err.message, + 'Invalid param, expected boolean but received string' + ) + } + }) + + subtest.test( + 'Should throw if invalid Global opts.onRequestClosed', + async t => { + t.plan(3) + + const app = fastify() + try { + await app.register(plugin, { onRequestClosed: 1 }).ready() + } catch (err) { + t.ok(err, 'should throw') + t.equal(err.code, 'FST_PLUGIN_RACE_BAD_PARAM') + t.equal( + err.message, + 'Invalid param, expected function but received number' + ) + } + } + ) +}) + +// TODO: find what's hanging the tests +// TODO: remove "only" once done +tap.test('fastify-racing#promise', { only: true }, subtest => { + subtest.plan(4) + + subtest.test('Should handle a request aborted', t => { + t.plan(3) + + const app = fastify() + // eslint-disable-next-line no-undef + const abtCtlr = new AbortController() + app.register(plugin) + + t.teardown(() => app.close()) + + app.get('/', async (req, _reply) => { + const signal = req.race() + const result = await Promise.race([signal, dummy(signal)]) + + t.equal(typeof result, 'object') + t.equal(result.type, 'abort') + + if (result.type === 'aborted') return '' + else return `${result}-world` + }) + + app + .ready() + .then(() => app.listen()) + .then(async () => { + request( + `http://localhost:${app.server.address().port}`, + { + method: 'GET', + path: '/', + signal: abtCtlr.signal + }, + err => { + t.ok(err) + } + ) + + // Allow a full event loop cycle + await sleep(5) + abtCtlr.abort() + }) + }) + + subtest.test( + 'Should be able to handle more than one race check within a request', + t => { + const app = fastify() + // eslint-disable-next-line no-undef + const abtCtlr = new AbortController() + let starter + + t.plan(10) + + app.register(plugin) + + app.get( + '/', + { + preHandler: [ + async (req, _reply) => { + starter = req.race() + const result = await Promise.race([starter, dummy(starter, 10)]) + t.equal(result, 'hello') + }, + async (req, _reply) => { + const second = req.race() + const result = await Promise.race([second, dummy(second, 10)]) + + t.equal(result, 'hello') + t.equal( + starter, + second, + 'Should use the same AbortController instance' + ) + }, + async (req, _reply) => { + const third = req.race() + const result = await Promise.race([third, dummy(third, 10)]) + t.equal(result, 'hello') + t.equal( + starter, + third, + 'Should use the same AbortController instance' + ) + } + ] + }, + async (req, _reply) => { + const final = req.race() + + const result = await Promise.race([final, dummy(final, 2000)]) + + t.ok(final.aborted) + t.equal(final, starter, 'Should reuse the initial controller') + + t.equal(typeof result, 'object') + t.equal(result.type, 'abort') + + return '' + } + ) + + t.teardown(() => app.close()) + + app + .ready() + .then(() => app.listen()) + .then(async () => { + request( + `http://localhost:${app.server.address().port}`, + { + method: 'GET', + path: '/', + signal: abtCtlr.signal + }, + err => { + t.ok(err) + } + ) + + // Allow a full event loop cycle + await sleep(500) + abtCtlr.abort() + }) + } + ) + + subtest.test( + 'Should reuse AbortController for the single request', + async t => { + let first + const app = fastify() + + t.plan(5) + + app.register(plugin) + + app.get( + '/', + { + preHandler: (req, _reply, done) => { + first = req.race() + + t.ok(first) + done() + } + }, + (req, _reply) => { + const second = req.race() + + t.notOk(second.aborted) + t.equal(second, first, 'Should reuse the initial controller') + + return 'Hello World' + } + ) + + t.teardown(() => app.close()) + + await app.listen() + + const response = await request( + `http://localhost:${app.server.address().port}`, + { + method: 'GET', + path: '/' + } + ) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'Hello World') + t.end() + } + ) + + // TODO: Find how to close the socket after request finished + subtest.test( + 'Should throw on already closed request', + async t => { + let first + const app = fastify() + + t.plan(7) + + app.register(plugin) + + app.get( + '/', + { + onResponse: async (req, _reply, done) => { + req.raw.destroy() + + try { + first = await req.race() + } catch (err) { + t.ok(err) + t.ok(err instanceof Errors.SOCKET_CLOSED) + t.equal(err.code, 'FST_PLUGIN_RACE_SOCKET_CLOSED') + t.equal(err.statusCode, 500) + } + + t.notOk(first) + done() + } + }, + (req, _reply) => { + return 'Hello World' + } + ) + + t.teardown(() => app.close()) + + await app.listen() + + const response = await request( + `http://localhost:${app.server.address().port}`, + { + method: 'GET', + path: '/' + } + ) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'Hello World') + } + ) + + async function dummy (signal, ms = 3000) { + await sleep(ms, null, { signal, ref: false }) + return 'hello' + } +})