Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions aedes.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import EventEmitter from 'node:events'
import parallel from 'fastparallel'
import series from 'fastseries'
import { v4 as uuidv4 } from 'uuid'
import reusify from 'reusify'
import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import Packet from 'aedes-packet'
import memory from 'aedes-persistence'
import mqemitter from 'mqemitter'
import Client from './lib/client.js'
import { $SYS_PREFIX, bulk } from './lib/utils.js'
import { $SYS_PREFIX, bulk, ObjectPool } from './lib/utils.js'
import pkg from './package.json' with { type: 'json' }

const defaultOptions = {
Expand Down Expand Up @@ -37,7 +34,7 @@ export class Aedes extends EventEmitter {

opts = Object.assign({}, defaultOptions, opts)
this.opts = opts
this.id = opts.id || uuidv4()
this.id = opts.id || randomUUID()
// +1 when construct a new aedes-packet
// internal track for last brokerCounter
this.counter = 0
Expand All @@ -56,9 +53,8 @@ export class Aedes extends EventEmitter {
return new Client(that, conn, req)
}

this._parallel = parallel()
this._series = series()
this._enqueuers = reusify(DoEnqueues)
this._series = runSeries
Comment thread
robertsLando marked this conversation as resolved.
Outdated
this._enqueuers = new ObjectPool(DoEnqueues)
Comment thread
robertsLando marked this conversation as resolved.
Outdated

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
Expand Down Expand Up @@ -131,8 +127,18 @@ export class Aedes extends EventEmitter {
)
}, opts.heartbeatInterval * 4)

function receiveWills (chunks, done) {
that._parallel(that, checkAndPublish, chunks, done)
async function receiveWills (chunks, done) {
try {
await Promise.all(chunks.map(chunk => new Promise((resolve, reject) => {
checkAndPublish(chunk, (err) => {
if (err) reject(err)
else resolve()
})
})))
done()
Comment thread
robertsLando marked this conversation as resolved.
Outdated
} catch (err) {
done(err)
}
Comment thread
robertsLando marked this conversation as resolved.
}

function checkAndPublish (will, done) {
Expand All @@ -142,7 +148,7 @@ export class Aedes extends EventEmitter {

// randomize this, so that multiple brokers
// do not publish the same wills at the same time
this.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
that.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
if (err) { return doneWill() }
that.publish(will, doneWill)

Expand Down Expand Up @@ -265,11 +271,14 @@ export class Aedes extends EventEmitter {
this.closed = true
clearInterval(this._heartbeatInterval)
clearInterval(this._clearWillInterval)
this._parallel(this, closeClient, Object.keys(this.clients), doneClose)
function doneClose () {
const promises = []
for (const clientId of Object.keys(this.clients)) {
promises.push(closeClient(this.clients[clientId]))
Comment thread
seriousme marked this conversation as resolved.
}
Promise.all(promises).then(() => {
Comment thread
robertsLando marked this conversation as resolved.
Outdated
that.emit('closed')
that.mq.close(cb)
}
})
}
}

Expand Down Expand Up @@ -361,8 +370,21 @@ const publishFuncsQoS = [
callPublished
]

function closeClient (client, cb) {
this.clients[client].close(cb)
// runSeries runs functions in fastseries style
function runSeries (state, actions, packet, done) {
done = (done || noop).bind(state)
let i = 0
function next (err) {
if (err || i === actions.length) return done(err)
actions[i++].call(state, packet, next)
}
next()
}
Comment thread
seriousme marked this conversation as resolved.
Outdated

async function closeClient (clientInstance) {
return new Promise((resolve) => {
clientInstance.close(resolve)
})
}

function defaultPreConnect (client, packet, callback) {
Expand Down Expand Up @@ -400,11 +422,11 @@ class PublishState {
}
}

function noop () {}
function noop () { }

function warnMigrate () {
throw new Error(
` Aedes default export has been removed.
` Aedes default export has been removed.
Use 'const aedes = await Aedes.createBroker()' instead.
See: https://github.com/moscajs/aedes/docs/MIGRATION.MD
`)
Expand Down
2 changes: 1 addition & 1 deletion docs/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ a read-only flag indicates if client is closed or not.

## client.id

- `<string>` __Default__: `aedes_${hyperid()}`
- `<string>` __Default__: `aedes_${randomUUID()}`

Client unique identifier, specified by CONNECT packet.

Expand Down
5 changes: 2 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import mqtt from 'mqtt-packet'
import EventEmitter from 'node:events'
import util from 'util'
import eos from 'end-of-stream'
import Packet from 'aedes-packet'
import write from './write.js'
import QoSPacket from './qos-packet.js'
import handleSubscribe from './handlers/subscribe.js'
import handleUnsubscribe from './handlers/unsubscribe.js'
import handle from './handlers/index.js'
import { pipeline } from 'stream'
import { pipeline, finished } from 'stream'
Comment thread
seriousme marked this conversation as resolved.
import { through } from './utils.js'

class Client {
Expand Down Expand Up @@ -84,7 +83,7 @@ class Client {
this._parser.on('error', this.emit.bind(this, 'error'))

conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))
this._eos = finished(this.conn, this.close.bind(this))

const getToForwardPacket = (_packet) => {
// Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169
Expand Down
9 changes: 3 additions & 6 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import retimer from 'retimer'
import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import write from '../write.js'
import QoSPacket from '../qos-packet.js'
import { through } from '../utils.js'
import { through, retimer } from '../utils.js'
import handleSubscribe from './subscribe.js'
import hyperid from 'hyperid'

const uniqueId = hyperid()

class Connack {
constructor (arg) {
Expand Down Expand Up @@ -84,7 +81,7 @@ function init (client, packet, done) {
return
}

client.id = clientId || 'aedes_' + uniqueId()
client.id = clientId || 'aedes_' + randomUUID()
Comment thread
seriousme marked this conversation as resolved.
client.clean = packet.clean
client.version = packet.protocolVersion
client._will = packet.will
Expand Down
48 changes: 38 additions & 10 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import fastfall from 'fastfall'
import Packet from 'aedes-packet'
import { through, validateTopic, $SYS_PREFIX } from '../utils.js'
import write from '../write.js'

const subscribeTopicActions = fastfall([
function runFall (fns) {
// run functions in fastfall style, only need the single argument function
return function (arg, cb) {
let i = 0
const ctx = this
function next (err, nextarg) {
if (err || i === fns.length) {
if (typeof cb === 'function') {
cb.call(ctx, err, nextarg)
}
return
}
const fn = fns[i++]
fn.call(ctx, nextarg, next)
}
next(null, arg)
}
}
Comment thread
robertsLando marked this conversation as resolved.
Outdated

const subscribeTopicActions = runFall([
authorize,
storeSubscriptions,
addSubs
])
const restoreTopicActions = fastfall([
const restoreTopicActions = runFall([
authorize,
addSubs
])
Expand Down Expand Up @@ -78,14 +96,24 @@ function _dedupe (subs) {
return ret
}

function handleSubscribe (client, packet, restore, done) {
async function handleSubscribe (client, packet, restore, done) {
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
client.broker._parallel(
new SubscribeState(client, packet, restore, done), // what will be this in the functions
doSubscribe, // function to call
packet.subscriptions, // first argument of the function
restore ? done : completeSubscribe // the function to be called when the parallel ends
)
const state = new SubscribeState(client, packet, restore, done)
try {
await Promise.all(packet.subscriptions.map(sub => new Promise((resolve, reject) => {
doSubscribe.call(state, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})))
if (restore) {
done()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if this throws, it would call done twice

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why, can you please explain?

I tried the following:

const pOk = new Promise(resolve => {
    console.log('pOK will resolve')
    resolve()
    })

const pFail = new Promise((resolve,reject) => {
    console.log('pFail will reject')
    reject('pFail rejected')
    })

try {
    await Promise.all([pOk,pFail])
    console.log('all Promises resolved ok')
} catch (err){
    console.log('At least one promise failed, first error:', err)
}

This gives me:

pOK will resolve
pFail will reject
At least one promise failed, first error: pFail rejected

Btw: I reworked doSubscribe to make this part more easy to read.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seriousme — the concern is not the Promise.all([ok, fail]) resolution shape (you're right that the catch fires once). The issue is what happens after await Promise.all(...) resolves and completeSubscribe.call(state) runs.

write() in lib/write.js always uses setImmediate(done, error, client) — so done is scheduled but not yet invoked. If anything in completeSubscribe throws synchronously after that write() call (a listener installed on the 'subscribe' event throws, broker.publish(...) throws inside an mq listener, persistence.createRetainedStreamCombi throws, etc.), the error escapes the try block:

try {
  await Promise.all(...)
  completeSubscribe.call(state)   // queues done via write(), then throws
} catch (err) {
  done(err)                       // catch calls done
}
// next tick: setImmediate fires → done called AGAIN

Minimal repro:

const state = { finish: function(err){ console.log('DONE', err?.message ?? 'OK') } }
function write (cb) { setImmediate(cb) }
function completeSubscribe () {
  write(state.finish)
  throw new Error('listener threw')
}
async function doSub () { return Promise.resolve() }
async function handleSubscribe () {
  try {
    await Promise.all([doSub()])
    completeSubscribe.call(state)
  } catch (err) {
    state.finish(err)
  }
}
handleSubscribe()
// → DONE listener threw
// → DONE OK     ← second invocation

Fix direction: move the post-write work outside the try block, or wrap done in an idempotent guard before passing it through state.finish.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
completeSubscribe.call(state)
}
} catch (err) {
done(err)
}
Comment thread
seriousme marked this conversation as resolved.
Comment thread
robertsLando marked this conversation as resolved.
}

function doSubscribe (sub, done) {
Expand Down
20 changes: 13 additions & 7 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ function handleUnsubscribe (client, packet, done) {
}
}

function actualUnsubscribe (client, packet, done) {
const broker = client.broker
broker._parallel(
new UnsubscribeState(client, packet, done),
doUnsubscribe,
packet.unsubscriptions,
completeUnsubscribe)
async function actualUnsubscribe (client, packet, done) {
const state = new UnsubscribeState(client, packet, done)
try {
await Promise.all(packet.unsubscriptions.map(sub => new Promise((resolve, reject) => {
doUnsubscribe.call(state, sub, (err, result) => {
if (err) reject(err)
else resolve(result)
})
})))
completeUnsubscribe.call(state)
} catch (err) {
completeUnsubscribe.call(state, err)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new return done(err) in completeUnsubscribe closes the "caller hangs" path on Promise.all rejection — good.

But the symmetric case now opens up the same shape as the subscribe thread: on the success path, write(client, UnSubAck, done) (L89) queues setImmediate(done), then client.broker.emit('unsubscribe', …) (L96) can throw if a listener throws — escapes completeUnsubscribe, lands here in the catch → completeUnsubscribe.call(state, err)done(err) at L83. The queued tick then fires done(null, client) a second time.

Same fix shape: wrap done once at the top of actualUnsubscribe (done = once(done)) so both branches share one gate. The once helper added in subscribe.js could move to utils.js and be reused here.

}
Comment thread
seriousme marked this conversation as resolved.
}

function doUnsubscribe (sub, done) {
Expand Down
34 changes: 34 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,40 @@ export function through (transform) {
})
}

// DIY object pooling as a replacement for reusify
export class ObjectPool {
constructor (ClassCtor) {
this.ClassCtor = ClassCtor
this.pool = []
}

get () {
return this.pool.pop() || new this.ClassCtor()
}

release (obj) {
// Optionally reset the object here if needed
this.pool.push(obj)
}
}
Comment thread
seriousme marked this conversation as resolved.

// Nodejs can clear timers in constant time these days
// https://asafdav2.github.io/2017/node-js-timers/
// Running retimers bench.js on Node 24.5 shows retimer to be 1 second faster on 1M invocations.
// hence we can simplify retimer so we don't have to rely on a dependency
export function retimer (fn, ms) {
let timeout = setTimeout(fn, ms)
return {
reschedule (newMs) {
clearTimeout(timeout)
timeout = setTimeout(fn, newMs)
},
clear () {
clearTimeout(timeout)
}
}
}
Comment thread
mcollina marked this conversation as resolved.
Comment thread
robertsLando marked this conversation as resolved.

export function bulk (fn) {
return new Writable({
objectMode: true,
Expand Down
10 changes: 1 addition & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,8 @@
"dependencies": {
"aedes-packet": "^3.0.0",
"aedes-persistence": "^10.2.2",
"end-of-stream": "^1.4.5",
"fastfall": "^1.5.1",
"fastparallel": "^2.4.1",
"fastseries": "^2.0.0",
"hyperid": "^3.3.0",
"mqemitter": "^7.1.0",
"mqtt-packet": "^9.0.2",
"retimer": "^4.0.0",
"reusify": "^1.1.0",
"uuid": "^11.1.0"
"mqtt-packet": "^9.0.2"
},
"peerDependencies": {
"aedes-persistence-level": "^9.1.2",
Expand Down
Loading