Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 49 additions & 60 deletions aedes.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import EventEmitter from 'node:events'
import parallel from 'fastparallel'
import series from 'fastseries'
import { v4 as uuidv4 } from 'uuid'
import reusify from 'reusify'
import { pipeline } from 'stream'
import { randomUUID } from 'node:crypto'
import { promisify } from 'node:util'
import Packet from 'aedes-packet'
import memory from 'aedes-persistence'
import mqemitter from 'mqemitter'
import Client from './lib/client.js'
import { $SYS_PREFIX, bulk } from './lib/utils.js'
import { $SYS_PREFIX, batch, noop, runSeries } from './lib/utils.js'
import pkg from './package.json' with { type: 'json' }

const defaultOptions = {
Expand Down Expand Up @@ -38,7 +35,7 @@ export class Aedes extends EventEmitter {

opts = Object.assign({}, defaultOptions, opts)
this.opts = opts
this.id = opts.id || uuidv4()
this.id = opts.id || randomUUID()
// +1 when construct a new aedes-packet
// internal track for last brokerCounter
this.counter = 0
Expand All @@ -57,10 +54,6 @@ export class Aedes extends EventEmitter {
return new Client(that, conn, req)
}

this._parallel = parallel()
this._series = series()
this._enqueuers = reusify(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
this.authorizePublish = opts.authorizePublish
Expand Down Expand Up @@ -112,51 +105,45 @@ export class Aedes extends EventEmitter {
}, noop)
}

function deleteOldBrokers (broker) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
}
async function _clearWills () {
const pAuthorizePublish = promisify(that.authorizePublish).bind(that)
const pPublish = promisify(that.publish).bind(that)
const batchSize = 16 // default highWatermark for Writable in ObjectMode

this._clearWillInterval = setInterval(function () {
Object.keys(that.brokers).forEach(deleteOldBrokers)
async function checkAndPublish (will) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()
if (notPublish) {
return
}
// randomize this, so that multiple brokers
// do not publish the same wills at the same time
const client = that.clients[will.clientId] || null
await pAuthorizePublish(client, will)
await pPublish(will)
await that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
})
}

pipeline(
that.persistence.streamWill(that.brokers),
bulk(receiveWills),
function done (err) {
if (err) {
that.emit('error', err)
}
// delete old brokers
for (const broker in that.brokers) {
if (that.brokers[broker] + (3 * opts.heartbeatInterval) < Date.now()) {
delete that.brokers[broker]
}
)
}, opts.heartbeatInterval * 4)
}

function receiveWills (chunks, done) {
that._parallel(that, checkAndPublish, chunks, done)
const wills = that.persistence.streamWill(that.brokers)
for await (const promises of batch(wills, checkAndPublish, batchSize)) {
await Promise.all(promises)
}
Comment thread
robertsLando marked this conversation as resolved.
}

function checkAndPublish (will, done) {
const notPublish = that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now()

if (notPublish) return done()

// randomize this, so that multiple brokers
// do not publish the same wills at the same time
this.authorizePublish(that.clients[will.clientId] || null, will, function (err) {
if (err) { return doneWill() }
that.publish(will, doneWill)

function doneWill (err) {
if (err) { return done(err) }
that.persistence.delWill({
id: will.clientId,
brokerId: will.brokerId
}).then(will => done(undefined, will), done)
}
this._clearWillInterval = setInterval(() => {
_clearWills().catch(err => {
that.emit('error', err)
})
}

}, opts.heartbeatInterval * 4)
this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
that.brokers[packet.payload.toString()] = Date.now()
done()
Expand Down Expand Up @@ -211,7 +198,7 @@ export class Aedes extends EventEmitter {
const p = new Packet(packet, this)
const publishFuncs = p.qos > 0 ? publishFuncsQoS : publishFuncsSimple

this._series(new PublishState(this, client, packet), publishFuncs, p, done)
runSeries(new PublishState(this, client, packet), publishFuncs, p, done)
}

subscribe (topic, func, done) {
Expand Down Expand Up @@ -266,11 +253,14 @@ export class Aedes extends EventEmitter {
this.closed = true
clearInterval(this._heartbeatInterval)
clearInterval(this._clearWillInterval)
this._parallel(this, closeClient, Object.keys(this.clients), doneClose)
function doneClose () {
const promises = []
for (const clientId of Object.keys(this.clients)) {
promises.push(closeClient(this.clients[clientId]))
Comment thread
seriousme marked this conversation as resolved.
}
Promise.all(promises).finally(() => {
that.emit('closed')
that.mq.close(cb)
}
})
}
}

Expand All @@ -289,7 +279,7 @@ function emitPacket (packet, done) {
}

function enqueueOffline (packet, done) {
const enqueuer = this.broker._enqueuers.get()
const enqueuer = new DoEnqueues()

enqueuer.complete = done
enqueuer.packet = packet
Expand Down Expand Up @@ -333,7 +323,6 @@ class DoEnqueues {

broker.persistence.outgoingEnqueueCombi(subs, packet)
.then(() => complete(null), complete)
broker._enqueuers.release(that)
}
}
}
Expand Down Expand Up @@ -362,8 +351,10 @@ const publishFuncsQoS = [
callPublished
]

function closeClient (client, cb) {
this.clients[client].close(cb)
async function closeClient (clientInstance) {
return new Promise((resolve) => {
clientInstance.close(resolve)
})
}

function defaultPreConnect (client, packet, callback) {
Expand Down Expand Up @@ -401,11 +392,9 @@ class PublishState {
}
}

function noop () {}

function warnMigrate () {
throw new Error(
` Aedes default export has been removed.
` Aedes default export has been removed.
Use 'const aedes = await Aedes.createBroker()' instead.
See: https://github.com/moscajs/aedes/docs/MIGRATION.MD
`)
Expand Down
2 changes: 1 addition & 1 deletion docs/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ a read-only flag indicates if client is closed or not.

## client.id

- `<string>` __Default__: `aedes_${hyperid()}`
- `<string>` __Default__: `aedes_${randomUUID()}`

Client unique identifier, specified by CONNECT packet.

Expand Down
17 changes: 6 additions & 11 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import mqtt from 'mqtt-packet'
import EventEmitter from 'node:events'
import util from 'util'
import eos from 'end-of-stream'
import Packet from 'aedes-packet'
import write from './write.js'
import QoSPacket from './qos-packet.js'
import handleSubscribe from './handlers/subscribe.js'
import handleUnsubscribe from './handlers/unsubscribe.js'
import handle from './handlers/index.js'
import { pipeline } from 'stream'
import { pipeline, finished } from 'stream'
Comment thread
seriousme marked this conversation as resolved.
import { through } from './utils.js'

class Client {
Expand Down Expand Up @@ -88,7 +87,7 @@ class Client {
this._parser.on('error', this.emit.bind(this, 'error'))

conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))
this._eos = finished(this.conn, this.close.bind(this))

const getToForwardPacket = (_packet) => {
// Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169
Expand Down Expand Up @@ -171,9 +170,9 @@ class Client {
this.conn.removeAllListeners('error')
this.conn.on('error', noop)
// hack to clean up the write callbacks in case of error
const state = this.conn._writableState
const list = typeof state.getBuffer === 'function' ? state.getBuffer() : state.buffer
list.forEach(drainRequest)
Comment on lines -174 to -176

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was an old hacky hack to fix an annoying but, wondering if it's still legit or not, no clue if we ever have a test that covered this

if (!this.conn.destroyed) {
this.conn.destroy(err)
}
this.broker.emit(this.id ? 'clientError' : 'connectionError', this, err)
this.close()
}
Expand Down Expand Up @@ -320,7 +319,7 @@ class Client {
this._parser._queue = null

if (this._keepaliveTimer) {
this._keepaliveTimer.clear()
clearTimeout(this._keepaliveTimer)
this._keepaliveInterval = -1
this._keepaliveTimer = null
}
Expand Down Expand Up @@ -461,10 +460,6 @@ function writeQoS (err, client, packet) {
}
}

function drainRequest (req) {
req.callback()
}

util.inherits(Client, EventEmitter)

function enqueue (packet) {
Expand Down
21 changes: 10 additions & 11 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import retimer from 'retimer'
import { randomUUID } from 'node:crypto'
import { pipeline } from 'stream'
import write from '../write.js'
import QoSPacket from '../qos-packet.js'
import { through } from '../utils.js'
import { runSeries, through } from '../utils.js'
import handleSubscribe from './subscribe.js'
import hyperid from 'hyperid'

const uniqueId = hyperid()

class Connack {
constructor (arg) {
Expand Down Expand Up @@ -84,12 +81,12 @@ function init (client, packet, done) {
return
}

client.id = clientId || 'aedes_' + uniqueId()
client.id = clientId || 'aedes_' + randomUUID()
Comment thread
seriousme marked this conversation as resolved.
client.clean = packet.clean
client.version = packet.protocolVersion
client._will = packet.will

client.broker._series(
runSeries(
new ClientPacketStatus(client, packet),
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
Expand Down Expand Up @@ -150,12 +147,14 @@ function authenticate (arg, done) {
function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
const client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {

function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = setTimeout(keepaliveTimeout, client._keepaliveInterval)
}
done()
}
Expand Down
2 changes: 1 addition & 1 deletion lib/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function handle (client, packet, done) {
}

if (client._keepaliveInterval > 0) {
client._keepaliveTimer.reschedule(client._keepaliveInterval)
client._keepaliveTimer.refresh()
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/handlers/publish.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { runSeries } from '../utils.js'
import write from '../write.js'

class PubAck {
Expand Down Expand Up @@ -33,7 +34,7 @@ function handlePublish (client, packet, done) {
err = new Error('+ is not allowed in PUBLISH')
return done(err)
}
client.broker._series(client, publishActions, packet, done)
runSeries(client, publishActions, packet, done)
}

function enqueuePublish (packet, done) {
Expand Down
3 changes: 2 additions & 1 deletion lib/handlers/pubrel.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import write from '../write.js'
import { runSeries } from '../utils.js'

class ClientPacketStatus {
constructor (client, packet) {
Expand All @@ -20,7 +21,7 @@ const pubrelActions = [
pubrelWrite
]
function handlePubrel (client, packet, done) {
client.broker._series(
runSeries(
new ClientPacketStatus(client, packet),
pubrelActions, {}, done)
}
Expand Down
Loading
Loading