diff --git a/aedes.js b/aedes.js index 75724e7c..383c821a 100644 --- a/aedes.js +++ b/aedes.js @@ -1,14 +1,11 @@ import EventEmitter from 'node:events' -import parallel from 'fastparallel' -import series from 'fastseries' -import { v4 as uuidv4 } from 'uuid' -import reusify from 'reusify' -import { pipeline } from 'stream' +import { randomUUID } from 'node:crypto' +import { promisify } from 'node:util' import Packet from 'aedes-packet' import memory from 'aedes-persistence' import mqemitter from 'mqemitter' import Client from './lib/client.js' -import { $SYS_PREFIX, bulk } from './lib/utils.js' +import { $SYS_PREFIX, batch, noop, runSeries } from './lib/utils.js' import pkg from './package.json' with { type: 'json' } const defaultOptions = { @@ -38,7 +35,7 @@ export class Aedes extends EventEmitter { opts = Object.assign({}, defaultOptions, opts) this.opts = opts - this.id = opts.id || uuidv4() + this.id = opts.id || randomUUID() // +1 when construct a new aedes-packet // internal track for last brokerCounter this.counter = 0 @@ -57,10 +54,6 @@ export class Aedes extends EventEmitter { return new Client(that, conn, req) } - this._parallel = parallel() - this._series = series() - this._enqueuers = reusify(DoEnqueues) - this.preConnect = opts.preConnect this.authenticate = opts.authenticate this.authorizePublish = opts.authorizePublish @@ -112,51 +105,45 @@ export class Aedes extends EventEmitter { }, noop) } - function deleteOldBrokers (broker) { - if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) { - delete that.brokers[broker] - } - } + async function _clearWills () { + const pAuthorizePublish = promisify(that.authorizePublish).bind(that) + const pPublish = promisify(that.publish).bind(that) + const batchSize = 16 // default highWatermark for Writable in ObjectMode - this._clearWillInterval = setInterval(function () { - Object.keys(that.brokers).forEach(deleteOldBrokers) + async function checkAndPublish (will) { + const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now() + if (notPublish) { + return + } + // randomize this, so that multiple brokers + // do not publish the same wills at the same time + const client = that.clients[will.clientId] || null + await pAuthorizePublish(client, will) + await pPublish(will) + await that.persistence.delWill({ + id: will.clientId, + brokerId: will.brokerId + }) + } - pipeline( - that.persistence.streamWill(that.brokers), - bulk(receiveWills), - function done (err) { - if (err) { - that.emit('error', err) - } + // delete old brokers + for (const broker in that.brokers) { + if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) { + delete that.brokers[broker] } - ) - }, opts.heartbeatInterval * 4) + } - function receiveWills (chunks, done) { - that._parallel(that, checkAndPublish, chunks, done) + const wills = that.persistence.streamWill(that.brokers) + for await (const promises of batch(wills, checkAndPublish, batchSize)) { + await Promise.all(promises) + } } - function checkAndPublish (will, done) { - const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now() - - if (notPublish) return done() - - // randomize this, so that multiple brokers - // do not publish the same wills at the same time - this.authorizePublish(that.clients[will.clientId] || null, will, function (err) { - if (err) { return doneWill() } - that.publish(will, doneWill) - - function doneWill (err) { - if (err) { return done(err) } - that.persistence.delWill({ - id: will.clientId, - brokerId: will.brokerId - }).then(will => done(undefined, will), done) - } + this._clearWillInterval = setInterval(() => { + _clearWills().catch(err => { + that.emit('error', err) }) - } - + }, opts.heartbeatInterval * 4) this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) { that.brokers[packet.payload.toString()] = Date.now() done() @@ -211,7 +198,7 @@ export class Aedes extends EventEmitter { const p = new Packet(packet, this) const publishFuncs = p.qos > 0 ? publishFuncsQoS : publishFuncsSimple - this._series(new PublishState(this, client, packet), publishFuncs, p, done) + runSeries(new PublishState(this, client, packet), publishFuncs, p, done) } subscribe (topic, func, done) { @@ -266,11 +253,14 @@ export class Aedes extends EventEmitter { this.closed = true clearInterval(this._heartbeatInterval) clearInterval(this._clearWillInterval) - this._parallel(this, closeClient, Object.keys(this.clients), doneClose) - function doneClose () { + const promises = [] + for (const clientId of Object.keys(this.clients)) { + promises.push(closeClient(this.clients[clientId])) + } + Promise.all(promises).finally(() => { that.emit('closed') that.mq.close(cb) - } + }) } } @@ -289,7 +279,7 @@ function emitPacket (packet, done) { } function enqueueOffline (packet, done) { - const enqueuer = this.broker._enqueuers.get() + const enqueuer = new DoEnqueues() enqueuer.complete = done enqueuer.packet = packet @@ -333,7 +323,6 @@ class DoEnqueues { broker.persistence.outgoingEnqueueCombi(subs, packet) .then(() => complete(null), complete) - broker._enqueuers.release(that) } } } @@ -362,8 +351,10 @@ const publishFuncsQoS = [ callPublished ] -function closeClient (client, cb) { - this.clients[client].close(cb) +async function closeClient (clientInstance) { + return new Promise((resolve) => { + clientInstance.close(resolve) + }) } function defaultPreConnect (client, packet, callback) { @@ -401,11 +392,9 @@ class PublishState { } } -function noop () {} - function warnMigrate () { throw new Error( -` Aedes default export has been removed. + ` Aedes default export has been removed. Use 'const aedes = await Aedes.createBroker()' instead. See: https://github.com/moscajs/aedes/docs/MIGRATION.MD `) diff --git a/docs/Client.md b/docs/Client.md index b9ffc0cf..faf3b84b 100644 --- a/docs/Client.md +++ b/docs/Client.md @@ -62,7 +62,7 @@ a read-only flag indicates if client is closed or not. ## client.id -- `` __Default__: `aedes_${hyperid()}` +- `` __Default__: `aedes_${randomUUID()}` Client unique identifier, specified by CONNECT packet. diff --git a/lib/client.js b/lib/client.js index a5df8f3e..9ed2c151 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,14 +1,13 @@ import mqtt from 'mqtt-packet' import EventEmitter from 'node:events' import util from 'util' -import eos from 'end-of-stream' import Packet from 'aedes-packet' import write from './write.js' import QoSPacket from './qos-packet.js' import handleSubscribe from './handlers/subscribe.js' import handleUnsubscribe from './handlers/unsubscribe.js' import handle from './handlers/index.js' -import { pipeline } from 'stream' +import { pipeline, finished } from 'stream' import { through } from './utils.js' class Client { @@ -88,7 +87,7 @@ class Client { this._parser.on('error', this.emit.bind(this, 'error')) conn.on('end', this.close.bind(this)) - this._eos = eos(this.conn, this.close.bind(this)) + this._eos = finished(this.conn, this.close.bind(this)) const getToForwardPacket = (_packet) => { // Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169 @@ -171,9 +170,9 @@ class Client { this.conn.removeAllListeners('error') this.conn.on('error', noop) // hack to clean up the write callbacks in case of error - const state = this.conn._writableState - const list = typeof state.getBuffer === 'function' ? state.getBuffer() : state.buffer - list.forEach(drainRequest) + if (!this.conn.destroyed) { + this.conn.destroy(err) + } this.broker.emit(this.id ? 'clientError' : 'connectionError', this, err) this.close() } @@ -320,7 +319,7 @@ class Client { this._parser._queue = null if (this._keepaliveTimer) { - this._keepaliveTimer.clear() + clearTimeout(this._keepaliveTimer) this._keepaliveInterval = -1 this._keepaliveTimer = null } @@ -461,10 +460,6 @@ function writeQoS (err, client, packet) { } } -function drainRequest (req) { - req.callback() -} - util.inherits(Client, EventEmitter) function enqueue (packet) { diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index b7e2129f..bc81dd06 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -1,12 +1,9 @@ -import retimer from 'retimer' +import { randomUUID } from 'node:crypto' import { pipeline } from 'stream' import write from '../write.js' import QoSPacket from '../qos-packet.js' -import { through } from '../utils.js' +import { runSeries, through } from '../utils.js' import handleSubscribe from './subscribe.js' -import hyperid from 'hyperid' - -const uniqueId = hyperid() class Connack { constructor (arg) { @@ -84,12 +81,12 @@ function init (client, packet, done) { return } - client.id = clientId || 'aedes_' + uniqueId() + client.id = clientId || 'aedes_' + randomUUID() client.clean = packet.clean client.version = packet.protocolVersion client._will = packet.will - client.broker._series( + runSeries( new ClientPacketStatus(client, packet), connectActions, { returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4] @@ -150,12 +147,14 @@ function authenticate (arg, done) { function setKeepAlive (arg, done) { if (this.packet.keepalive > 0) { const client = this.client - // [MQTT-3.1.2-24] - client._keepaliveInterval = (this.packet.keepalive * 1500) + 1 - client._keepaliveTimer = retimer(function keepaliveTimeout () { + + function keepaliveTimeout () { client.broker.emit('keepaliveTimeout', client) client.emit('error', new Error('keep alive timeout')) - }, client._keepaliveInterval) + } + // [MQTT-3.1.2-24] + client._keepaliveInterval = (this.packet.keepalive * 1500) + 1 + client._keepaliveTimer = setTimeout(keepaliveTimeout, client._keepaliveInterval) } done() } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index f3d4f57d..512688e4 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -61,7 +61,7 @@ function handle (client, packet, done) { } if (client._keepaliveInterval > 0) { - client._keepaliveTimer.reschedule(client._keepaliveInterval) + client._keepaliveTimer.refresh() } } diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 3cce5505..0ae4b8bb 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -1,3 +1,4 @@ +import { runSeries } from '../utils.js' import write from '../write.js' class PubAck { @@ -33,7 +34,7 @@ function handlePublish (client, packet, done) { err = new Error('+ is not allowed in PUBLISH') return done(err) } - client.broker._series(client, publishActions, packet, done) + runSeries(client, publishActions, packet, done) } function enqueuePublish (packet, done) { diff --git a/lib/handlers/pubrel.js b/lib/handlers/pubrel.js index 96afe51b..fa4c02b6 100644 --- a/lib/handlers/pubrel.js +++ b/lib/handlers/pubrel.js @@ -1,4 +1,5 @@ import write from '../write.js' +import { runSeries } from '../utils.js' class ClientPacketStatus { constructor (client, packet) { @@ -20,7 +21,7 @@ const pubrelActions = [ pubrelWrite ] function handlePubrel (client, packet, done) { - client.broker._series( + runSeries( new ClientPacketStatus(client, packet), pubrelActions, {}, done) } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index d5afa5e6..4ff20d80 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -1,14 +1,13 @@ -import fastfall from 'fastfall' import Packet from 'aedes-packet' -import { through, validateTopic, $SYS_PREFIX } from '../utils.js' +import { through, validateTopic, $SYS_PREFIX, runFall, once } from '../utils.js' import write from '../write.js' -const subscribeTopicActions = fastfall([ +const subscribeTopicActions = runFall([ authorize, storeSubscriptions, addSubs ]) -const restoreTopicActions = fastfall([ +const restoreTopicActions = runFall([ authorize, addSubs ]) @@ -78,20 +77,31 @@ function _dedupe (subs) { return ret } -function handleSubscribe (client, packet, restore, done) { +async function handleSubscribe (client, packet, restore, finish) { + const done = once(finish) packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions) - client.broker._parallel( - new SubscribeState(client, packet, restore, done), // what will be this in the functions - doSubscribe, // function to call - packet.subscriptions, // first argument of the function - restore ? done : completeSubscribe // the function to be called when the parallel ends - ) + const state = new SubscribeState(client, packet, restore, done) + try { + await Promise.all(packet.subscriptions.map(sub => doSubscribe(state, sub))) + if (restore) { + done() + } else { + completeSubscribe.call(state) + } + } catch (err) { + done(err) + } } -function doSubscribe (sub, done) { - const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl) - this.subState.push(s) - this.actions.call(s, sub, done) +async function doSubscribe (state, sub) { + return new Promise((resolve, reject) => { + const s = new SubState(state.client, state.packet, sub.qos, sub.rh, sub.rap, sub.nl) + state.subState.push(s) + state.actions.call(s, sub, (err, result) => { + if (err) reject(err) + else resolve(result) + }) + }) } function authorize (sub, done) { @@ -185,13 +195,8 @@ function isStartsWithWildcard (topic) { return code === 43 || code === 35 } -function completeSubscribe (err) { +function completeSubscribe () { const done = this.finish - - if (err) { - return done(err) - } - const packet = this.packet const client = this.client diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 9fd3d725..e0fd6317 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,5 +1,5 @@ import write from '../write.js' -import { validateTopic, $SYS_PREFIX } from '../utils.js' +import { validateTopic, $SYS_PREFIX, once } from '../utils.js' class UnSubAck { constructor (packet) { @@ -16,7 +16,8 @@ class UnsubscribeState { } } -function handleUnsubscribe (client, packet, done) { +function handleUnsubscribe (client, packet, finish) { + const done = once(finish) const broker = client.broker const unsubscriptions = packet.unsubscriptions let err @@ -40,42 +41,50 @@ function handleUnsubscribe (client, packet, done) { } } -function actualUnsubscribe (client, packet, done) { - const broker = client.broker - broker._parallel( - new UnsubscribeState(client, packet, done), - doUnsubscribe, - packet.unsubscriptions, - completeUnsubscribe) +async function actualUnsubscribe (client, packet, done) { + const state = new UnsubscribeState(client, packet, done) + try { + await Promise.all(packet.unsubscriptions.map(sub => doUnsubscribe(state, sub))) + completeUnsubscribe.call(state) + } catch (err) { + completeUnsubscribe.call(state, err) + } } -function doUnsubscribe (sub, done) { - const client = this.client - const broker = client.broker - const s = client.subscriptions[sub] - - if (s) { - const func = s.func - delete client.subscriptions[sub] - broker.unsubscribe( - sub, - func, - done) - } else { - done() - } +async function doUnsubscribe (state, sub) { + return new Promise((resolve, reject) => { + const client = state.client + const broker = client.broker + const s = client.subscriptions[sub] + + if (s) { + const func = s.func + broker.unsubscribe( + sub, + func, + (err) => { + if (err) reject(err) + else { + delete client.subscriptions[sub] + resolve() + } + }) + } else { + resolve() + } + }) } function completeUnsubscribe (err) { const client = this.client + const done = this.finish if (err) { client.emit('error', err) - return + return done(err) } const packet = this.packet - const done = this.finish if (packet.messageId !== undefined) { write(client, new UnSubAck(packet), diff --git a/lib/utils.js b/lib/utils.js index 78eca805..b69b9490 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,4 +1,6 @@ -import { Transform, Writable } from 'stream' +import { Transform } from 'stream' + +export function noop () { } export function validateTopic (topic, message) { if (!topic || topic.length === 0) { // [MQTT-3.8.3-3] @@ -37,13 +39,66 @@ export function through (transform) { }) } -export function bulk (fn) { - return new Writable({ - objectMode: true, - writev: function (chunks, cb) { - fn(chunks.map(chunk => chunk.chunk), cb) +export function runFall (fns) { + // run functions in fastfall style, only need the single argument function + return function (arg, cb) { + let i = 0 + const ctx = this + function next (err, nextarg) { + if (err || i === fns.length) { + if (typeof cb === 'function') { + cb.call(ctx, err, nextarg) + } + return + } + const fn = fns[i++] + fn.call(ctx, nextarg, next) } - }) + next(null, arg) + } +} + +export function runSeries (state, actions, packet, done) { + // runSeries runs functions in fastseries style + done = (done || noop).bind(state) + let i = 0 + function next (err) { + if (err || i === actions.length) return done(err) + actions[i++].call(state, packet, next) + } + next() +} + +export function once (fn) { + let called = false + return function (err) { + /* c8 ignore next */ + if (called) return + called = true + fn(err) + } +} + +/** + * Async generator that groups items from a readable into batches. + * Note: `fn(chunk)` is invoked eagerly for each item in the batch and + * the generator yields an array of the results (often promises). Callers + * must `await Promise.all(batch)` to apply backpressure before proceeding + * to the next yielded batch. + */ +export async function * batch (readable, fn, batchSize) { + let chunks = [] + for await (const chunk of readable) { + chunks.push(fn(chunk)) + if (chunks.length === batchSize) { + yield chunks + chunks = [] + } + } + // if chunks is half full when the iterator ends + if (chunks.length > 0) { + yield chunks + } } export const $SYS_PREFIX = '$SYS/' diff --git a/package.json b/package.json index 053555b1..1f33aa14 100644 --- a/package.json +++ b/package.json @@ -111,16 +111,8 @@ "dependencies": { "aedes-packet": "^3.0.0", "aedes-persistence": "^10.2.2", - "end-of-stream": "^1.4.5", - "fastfall": "^1.5.1", - "fastparallel": "^2.4.1", - "fastseries": "^2.0.0", - "hyperid": "^3.3.0", "mqemitter": "^7.1.0", - "mqtt-packet": "^9.0.2", - "retimer": "^4.0.0", - "reusify": "^1.1.0", - "uuid": "^11.1.0" + "mqtt-packet": "^9.0.2" }, "peerDependencies": { "aedes-persistence-level": "^9.1.2", diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index de2c30e9..cc04255b 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -7,6 +7,7 @@ import { nextPacketWithTimeOut, setup, subscribe, + withTimeout, } from './helper.js' test('publish direct to a single client QoS 0', async (t) => { @@ -1052,3 +1053,53 @@ test('custom function in broker.unsubscribe', async (t) => { t.assert.ok(client, 'client exists') t.assert.equal(packet.messageId, 42) }) + +// Regression tests for subscribe handling when the client or broker closes +// before subscription processing completes. + +test('ignore subscription when broker closes before addSubs executes', async (t) => { + t.plan(3) + + const s = await createAndConnect(t) + + s.broker.authorizeSubscribe = (client, sub, cb) => { + setImmediate(() => { + s.broker.close() + cb(null, sub) + }) + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 42, + subscriptions: [{ topic: 'hello', qos: 0 }] + }) + + const packet = await withTimeout(nextPacket(s), 50, null) + t.assert.equal(packet, null, 'should not receive a suback after broker closes') + t.assert.equal(s.broker.closed, true, 'broker is closed') + t.assert.equal(s.client.subscriptions.hello, undefined, 'subscription is not stored') +}) + +test('ignore subscription when client closes before addSubs executes', async (t) => { + t.plan(2) + + const s = await createAndConnect(t) + + s.broker.authorizeSubscribe = (client, sub, cb) => { + setImmediate(() => { + client.close() + cb(null, sub) + }) + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 43, + subscriptions: [{ topic: 'hello', qos: 0 }] + }) + + const packet = await withTimeout(nextPacket(s), 50, null) + t.assert.equal(packet, null, 'should not receive a suback after client closes') + t.assert.equal(s.client.subscriptions.hello, undefined, 'subscription is not stored') +}) diff --git a/test/client-queue.js b/test/client-queue.js new file mode 100644 index 00000000..b9509f0d --- /dev/null +++ b/test/client-queue.js @@ -0,0 +1,48 @@ +import { test } from 'node:test' +import { Aedes } from '../aedes.js' +import { setup } from './helper.js' + +test('client queue limit emits connectionError when publish packets overflow queue', { skip: false }, async (t) => { + t.plan(2) + + const queueLimit = 1 + const broker = await Aedes.createBroker({ + queueLimit, + authenticate (client, username, password, callback) { + setTimeout(() => callback(null, true), 50) + } + }) + t.after(() => broker.close()) + + const publishP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + + const connectP = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcde', + keepalive: 0 + } + + const s = setup(broker) + + await new Promise((resolve) => { + broker.once('connectionError', (client, err) => { + t.assert.equal(err.message, 'Client queue limit reached', 'Queue overflow raises connectionError') + t.assert.equal(client._parser._queue.length, queueLimit, 'Only queueLimit packets are queued') + s.conn.destroy() + resolve() + }) + + s.inStream.write(connectP) + s.inStream.write(publishP) + s.inStream.write(publishP) + }) +}) diff --git a/test/qos2.js b/test/qos2.js index 585a6b9e..f918cc53 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -599,3 +599,31 @@ test('send pubcomp when receiving pubrel even if incomingDelPacket throws (no pa t.assert.equal(pubcompPacket.cmd, 'pubcomp', 'should send pubcomp') t.assert.equal(pubcompPacket.messageId, 42, 'messageId should match') }) + +test('publish QoS 2 returns error when broker.publish fails', async (t) => { + t.plan(2) + + const s = await createAndConnect(t) + const broker = s.broker + const originalPublish = broker.publish.bind(broker) + + broker.publish = function (packet, client, done) { + if (packet.topic === 'hello' && packet.qos === 2) { + setImmediate(() => done(new Error('boom'))) + return + } + return originalPublish(packet, client, done) + } + + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 2, + messageId: 42 + }) + + const [client, err] = await once(broker, 'clientError') + t.assert.ok(client) + t.assert.equal(err.message, 'boom') +}) diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 00000000..e1bfb069 --- /dev/null +++ b/test/utils.js @@ -0,0 +1,123 @@ +import { test } from 'node:test' +import { runFall, batch } from '../lib/utils.js' +import { setTimeout } from 'timers/promises' + +test('runFall works ok', function (t) { + t.plan(5) + + function a (a, cb) { + t.assert.equal(a, 'a', 'first function 1st arg matches') + cb(null, 'b') + } + + function b (b, cb) { + t.assert.equal(b, 'b', 'second function 1st arg matches') + cb(null, 'c') + } + + function c (c, cb) { + t.assert.equal(c, 'c', 'third function 1st arg matches') + cb(null, 'd') + } + function done (err, d) { + t.assert.equal(err, null, 'no error') + t.assert.equal(d, 'd', 'done argument matches') + } + const fall = runFall([a, b, c]) + fall('a', done) +}) + +for (let i = 1; i < 4; i++) { + test(`runFall with function ${i} returning error`, function (t) { + t.plan(1 + i) + + function isErr (n) { + if (n === i) { + return 'oops' + } + return null + } + + function a (a, cb) { + t.assert.equal(a, 'a', 'function a arg matches') + cb(isErr(1), 'b') + } + + function b (b, cb) { + t.assert.equal(b, 'b', 'function b arg matches') + cb(isErr(2), 'c') + } + + function c (c, cb) { + t.assert.equal(c, 'c', 'function c arg matches') + cb(isErr(3), 'd') + } + function done (err, d) { + t.assert.equal(err, 'oops') + } + const fall = runFall([a, b, c]) + fall('a', done) + }) +} + +// generator helper to test batch +async function * generator (numItems) { + let ctr = 0 + while (ctr < numItems) { + yield ctr++ + } +} + +for (const [label, numItems, numBatches] of [ + ['< highWaterMark', 10, 1], + ['> highwatermark exact batch size', 32, 2], + ['> highwatermark with remainder', 100, 7], +]) { + test(`batch works with number of items ${label}`, async (t) => { + t.plan(2 + numItems) + const results = [] + const fn = async (i) => { + await setTimeout(10) + results.push(i * 2) + } + let numBatch = 0 + for await (const items of (batch(generator(numItems), fn, 16))) { + await Promise.all(items) + numBatch++ + } + t.assert.equal(numBatch, numBatches, 'all batches') + t.assert.equal(results.length, numItems, 'all items') + for (let i = 0; i < numItems; i++) { + t.assert.equal(results[i], i * 2) + } + }) +} + +test('test batch function throwing error', async (t) => { + t.plan(2) + const numItems = 100 + + const fn = async (i) => { + await setTimeout(10) + if (i === 20) { + throw new Error('item 20 failed') + } + } + let numBatches = 0 + + async function runBatches () { + for await (const items of (batch(generator(numItems), fn, 16))) { + numBatches++ + await Promise.all(items) + } + } + + // the extra promise is to make sure the test waits for the error + await new Promise(resolve => { + runBatches().catch(async err => { + t.assert.equal(err.message, 'item 20 failed') + t.assert.equal(numBatches, 2, 'correct number of batches before error was thrown') + resolve() + }) + }) +}) diff --git a/test/will.js b/test/will.js index 64358154..714717d6 100644 --- a/test/will.js +++ b/test/will.js @@ -117,6 +117,40 @@ test('delivers old will in case of a crash', async (t) => { t.assert.equal(received.length, 1, 'only one will has been delivered') }) +test('delivers many old wills in case of a crash', async (t) => { + t.plan(1) + + const numWills = 100 + const persistence = await memorySetup({ id: 'anotherBroker' }) + const will = { + topic: 'mywill', + payload: Buffer.from('last will'), + qos: 0, + retain: false + } + + for (let id = 0; id < numWills; id++) { + const cWill = structuredClone(will) + cWill.topic = 'mywill' + await persistence.putWill({ id: `myClientId${id}` }, cWill) + } + + const interval = 10 // ms, so that the will check happens fast! + const broker = await Aedes.createBroker({ + persistence, + heartbeatInterval: interval, + authorizePublish: (client, packet, callback) => { + callback(null) + } + }) + + t.after(() => broker.close()) + + const received = await willsFromBroker(broker) + await delay(100) // give Aedes some time to process to ensure that all wills are sent + t.assert.equal(received.length, numWills, 'all wills have been delivered') +}) + test('deliver old will without authorization in case of a crash', async (t) => { t.plan(1)