Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('AbortError', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('Timeout with custom timeout config', async client => {
Expand Down Expand Up @@ -689,6 +689,21 @@ describe('Client', () => {
}
});

testUtils.testWithClient('Module TypeMapping Fix', async (client) => {
const bufferProxy = client.withCommandOptions({
typeMapping: { [RESP_TYPES.BLOB_STRING]: Buffer }
});
const bufferReply = await bufferProxy.module.echo('hi');
const stringReply = await client.module.echo('hi');

assert.ok((bufferReply as unknown) instanceof Buffer, 'Proxy failed to return Buffer');
assert.strictEqual(typeof stringReply, 'string', 'Original client was corrupted');
assert.equal(bufferReply.toString(), stringReply);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: { modules: { module } }
})

testUtils.testWithClient('duplicate should reuse command options', async client => {
const duplicate = client.duplicate();

Expand Down
169 changes: 86 additions & 83 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { ClientMetricsHandle, ClientRegistry } from '../opentelemetry';
import { ClientIdentity, ClientRole, generateClientId } from './identity';
import { trace, sanitizeArgs, publish, CHANNELS, type CommandTraceContext } from './tracing';

const noop = () => {};
const noop = () => { };

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -257,7 +257,10 @@ export type RedisClientType<

type ProxyClient = RedisClient<any, any, any, any, any>;

type NamespaceProxyClient = { _self: ProxyClient };
type NamespaceProxyClient = {
_self: ProxyClient;
_commandOptions?: CommandOptions<any>
};

interface ScanIteratorOptions {
cursor?: RedisArgument;
Expand Down Expand Up @@ -290,7 +293,7 @@ export default class RedisClient<
const parser = new BasicCommandParser();
command.parseCommand(parser, ...args);

return this._self._executeCommand(command, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(command, parser, this._commandOptions, transformReply);
Comment thread
cursor[bot] marked this conversation as resolved.
};
}

Expand All @@ -303,7 +306,7 @@ export default class RedisClient<
parser.push(...prefix);
fn.parseCommand(parser, ...args);

return this._self._executeCommand(fn, parser, this._self._commandOptions, transformReply);
return this._self._executeCommand(fn, parser, this._commandOptions, transformReply);
};
}

Expand Down Expand Up @@ -587,7 +590,7 @@ export default class RedisClient<

this.#registerForMetrics();

if(this.#options.maintNotifications !== 'disabled') {
if (this.#options.maintNotifications !== 'disabled') {
new EnterpriseMaintenanceManager(this.#queue, this, this.#options);
};

Expand Down Expand Up @@ -664,7 +667,7 @@ export default class RedisClient<
this._commandOptions = options.commandOptions;
}

if(options.maintNotifications !== 'disabled') {
if (options.maintNotifications !== 'disabled') {
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options);
}

Expand Down Expand Up @@ -847,16 +850,16 @@ export default class RedisClient<
}

if (this.#clientSideCache) {
commands.push({cmd: this.#clientSideCache.trackingOn()});
commands.push({ cmd: this.#clientSideCache.trackingOn() });
}

if (this.#options?.emitInvalidate) {
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
commands.push({ cmd: ['CLIENT', 'TRACKING', 'ON'] });
}

const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options, this._clientId);

if(maintenanceHandshakeCmd) {
if (maintenanceHandshakeCmd) {
commands.push(maintenanceHandshakeCmd);
};

Expand All @@ -872,24 +875,24 @@ export default class RedisClient<
this.emit('error', err);
}
})
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
}
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
})
.on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this.#maybeScheduleWrite())
.on('end', () => this.emit('end'));
}

#initiateSocket(clientId: string): RedisSocket {
Expand Down Expand Up @@ -1055,61 +1058,61 @@ export default class RedisClient<
/**
* @internal
*/
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if(this._self.#socket) {
_ejectSocket(): RedisSocket {
const socket = this._self.#socket;
// @ts-ignore
this._self.#socket = null;
socket.removeAllListeners();
return socket;
}

/**
* @internal
*/
_insertSocket(socket: RedisSocket) {
if (this._self.#socket) {
this._self._ejectSocket().destroy();
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}
}
this._self.#socket = socket;
this._self.#attachListeners(this._self.#socket);
}

/**
* @internal
*/
_maintenanceUpdate(update: MaintenanceUpdate) {
this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout);
this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout);
}

/**
* @internal
*/
_pause() {
this._self.#paused = true;
}

/**
* @internal
*/
_unpause() {
this._self.#paused = false;
this._self.#maybeScheduleWrite();
}

/**
* @internal
*/
_handleSmigrated(smigratedEvent: SMigratedEvent) {
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
}

/**
* @internal
*/
_getQueue(): RedisCommandsQueue {
return this._self.#queue;
}

/**
* @internal
Expand Down Expand Up @@ -1183,7 +1186,7 @@ export default class RedisClient<

// Merge global options with provided options
const opts = {
...this._self._commandOptions,
...this._commandOptions,
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
...options,
};

Expand Down Expand Up @@ -1371,7 +1374,7 @@ export default class RedisClient<
}

#write() {
if(this.#paused) {
if (this.#paused) {
return
}
this.#socket.write(this.#queue.commandsToWrite());
Expand Down