Skip to content
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/meteor/app/api/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import './v1/mailer';
import './v1/teams';
import './v1/moderation';
import './v1/uploads';

import './v1/cronJobs';
// This has to come last so all endpoints are registered before generating the OpenAPI documentation
import './default/openApi';

Expand Down
232 changes: 232 additions & 0 deletions apps/meteor/app/api/server/v1/cronJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import { CronJobsSvc } from '@rocket.chat/core-services';

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Substantial duplicated endpoint logic across cron.enable, cron.disable, and cron.trigger increases maintenance drift risk. All three share identical auth/permission/body/response configuration and identical action handler error-handling structure, differing only in the service method called. Consider consolidating via a shared handler factory.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/meteor/app/api/server/v1/cronJobs.ts:

<comment>Substantial duplicated endpoint logic across cron.enable, cron.disable, and cron.trigger increases maintenance drift risk. All three share identical auth/permission/body/response configuration and identical action handler error-handling structure, differing only in the service method called. Consider consolidating via a shared handler factory.</comment>

<file context>
@@ -0,0 +1,232 @@
+import { CronJobsSvc } from '@rocket.chat/core-services';
+import type { ICronJobItem, ICronHistoryItem } from '@rocket.chat/core-typings';
+import { ajv, ajvQuery, validateUnauthorizedErrorResponse, validateBadRequestErrorResponse } from '@rocket.chat/rest-typings';
+
+import type { ExtractRoutesFromAPI } from '../ApiClass';
+import { API } from '../api';
+import { getPaginationItems } from '../helpers/getPaginationItems';
+
+const isCronJobsListParams = ajvQuery.compile<{
</file context>

import type { ICronJobItem, ICronHistoryItem } from '@rocket.chat/core-typings';
import { ajv, ajvQuery, validateUnauthorizedErrorResponse, validateBadRequestErrorResponse } from '@rocket.chat/rest-typings';

import type { ExtractRoutesFromAPI } from '../ApiClass';
import { API } from '../api';
import { getPaginationItems } from '../helpers/getPaginationItems';

const isCronJobsListParams = ajvQuery.compile<{
offset?: number;
count?: number;
}>({
type: 'object',
properties: {
offset: { type: 'number', nullable: true },
count: { type: 'number', nullable: true },
},
additionalProperties: false,
});

const isCronJobsActionParams = ajv.compile<{
jobName: string;
}>({
type: 'object',
properties: {
jobName: { type: 'string' },
},
required: ['jobName'],
additionalProperties: false,
});

const isCronJobsHistoryParams = ajvQuery.compile<{
jobName: string;
offset?: number;
count?: number;
}>({
type: 'object',
properties: {
jobName: { type: 'string' },
offset: { type: 'number' },
count: { type: 'number' },
},
required: ['jobName'],
additionalProperties: false,
});

const isCronJobsListResponse = ajv.compile<{ jobs: ICronJobItem[]; count: number; offset: number; total: number; success: boolean }>({
type: 'object',
properties: {
jobs: { type: 'array' },
count: { type: 'number' },
offset: { type: 'number' },
total: { type: 'number' },
success: { type: 'boolean', enum: [true] },
},
required: ['jobs', 'count', 'offset', 'total', 'success'],
additionalProperties: false,
});

const isCronJobsHistoryResponse = ajv.compile<{
history: ICronHistoryItem[];
count: number;
offset: number;
total: number;
success: boolean;
}>({
type: 'object',
properties: {
history: { type: 'array' },
count: { type: 'number' },
offset: { type: 'number' },
total: { type: 'number' },
success: { type: 'boolean', enum: [true] },
},
required: ['history', 'count', 'offset', 'total', 'success'],
additionalProperties: false,
});

const isCronJobsActionResponse = ajv.compile<void>({
type: 'object',
properties: {
success: { type: 'boolean', enum: [true] },
},
required: ['success'],
additionalProperties: false,
});

const cronJobsEndpoints = API.v1
.get(
'cron.jobs',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
query: isCronJobsListParams,
response: {
200: isCronJobsListResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { offset, count } = await getPaginationItems(this.queryParams);
const { jobs, total } = await CronJobsSvc.getCoreJobs({ offset, count });

return API.v1.success({
jobs,
count: jobs.length,
offset,
total,
});
},
)
.get(
'cron.appjobs',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
query: isCronJobsListParams,
response: {
200: isCronJobsListResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { offset, count } = await getPaginationItems(this.queryParams);
const { jobs, total } = await CronJobsSvc.getAppJobs({ offset, count });

return API.v1.success({
jobs,
count: jobs.length,
offset,
total,
});
},
)
.get(
'cron.history',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
query: isCronJobsHistoryParams,
response: {
200: isCronJobsHistoryResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { offset, count } = await getPaginationItems(this.queryParams);
const { jobName } = this.queryParams;
const { history, total } = await CronJobsSvc.getHistory(jobName, { offset, count });

return API.v1.success({
history,
count: history.length,
offset,
total,
});
},
)
.post(
'cron.enable',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
body: isCronJobsActionParams,
response: {
200: isCronJobsActionResponse,
400: validateBadRequestErrorResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { jobName } = this.bodyParams;
const success = await CronJobsSvc.enable(jobName);

if (!success) {
return API.v1.failure('error-job-not-found');
}
return API.v1.success();
},
)
.post(
'cron.disable',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
body: isCronJobsActionParams,
response: {
200: isCronJobsActionResponse,
400: validateBadRequestErrorResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { jobName } = this.bodyParams;
const success = await CronJobsSvc.disable(jobName);

if (!success) {
return API.v1.failure('error-job-not-found');
}
return API.v1.success();
},
)
.post(
'cron.trigger',
{
authRequired: true,
permissionsRequired: ['manage-scheduled-jobs'],
body: isCronJobsActionParams,
response: {
200: isCronJobsActionResponse,
400: validateBadRequestErrorResponse,
401: validateUnauthorizedErrorResponse,
},
},
async function action() {
const { jobName } = this.bodyParams;
const success = await CronJobsSvc.trigger(jobName);

if (!success) {
return API.v1.failure('error-job-not-found');
}

return API.v1.success();
},
);

export type CronJobsEndpoints = ExtractRoutesFromAPI<typeof cronJobsEndpoints>;

declare module '@rocket.chat/rest-typings' {
// eslint-disable-next-line @typescript-eslint/naming-convention, @typescript-eslint/no-empty-interface
interface Endpoints extends CronJobsEndpoints {}
}
38 changes: 35 additions & 3 deletions apps/meteor/app/apps/server/bridges/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,57 @@ import type { IAppServerOrchestrator } from '@rocket.chat/apps';
import { SchedulerBridge } from '@rocket.chat/apps/dist/server/bridges/SchedulerBridge';
import type { IProcessor, IOnetimeSchedule, IRecurringSchedule, IJobContext } from '@rocket.chat/apps-engine/definition/scheduler';
import { StartupType } from '@rocket.chat/apps-engine/definition/scheduler';
import { CronHistory } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';
import { ObjectId } from 'bson';
import { MongoInternals } from 'meteor/mongo';

function _callProcessor(processor: IProcessor['processor']): (job: Job) => Promise<void> {
return (job) => {
return async (job) => {
const data = job?.attrs?.data || {};

// This field is for internal use, no need to leak to app processor
delete (data as any).appId;

data.jobId = job.attrs._id.toString();

return (processor as (jobContext: IJobContext) => Promise<void>)(data).then(async () => {
const { insertedId } = await CronHistory.insertOne({
_id: Random.id(),
intendedAt: new Date(),
name: job.attrs.name,
startedAt: new Date(),
type: 'app',
});

try {
await (processor as (jobContext: IJobContext) => Promise<void>)(data);

await CronHistory.updateOne(
{ _id: insertedId },
{
$set: {
finishedAt: new Date(),
},
},
);

// ensure the 'normal' ('onetime' in our vocab) type job is removed after it is run
// as Agenda does not remove it from the DB
if (job.attrs.type === 'normal') {
await job.agenda.cancel({ _id: job.attrs._id });
}
});
} catch (error: any) {
await CronHistory.updateOne(
{ _id: insertedId },
{
$set: {
finishedAt: new Date(),
error: error?.stack ? error.stack : error,
},
},
);
throw error;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export const permissions = [
{ _id: 'view-privileged-setting', roles: ['admin'] },
{ _id: 'view-room-administration', roles: ['admin'] },
{ _id: 'view-statistics', roles: ['admin'] },
{ _id: 'manage-scheduled-jobs', roles: ['admin'] },
{ _id: 'view-user-administration', roles: ['admin'] },
{ _id: 'preview-c-room', roles: ['admin', 'user', 'federated-external', 'anonymous'] },
{ _id: 'view-outside-room', roles: ['admin', 'owner', 'moderator', 'user', 'federated-external'] },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Agenda } from '@rocket.chat/agenda';
import type { IUser } from '@rocket.chat/core-typings';
import type { MainLogger } from '@rocket.chat/logger';
import { LivechatRooms, Users } from '@rocket.chat/models';
import { LivechatRooms, Users, CronHistory } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';
import { Meteor } from 'meteor/meteor';
import { MongoInternals } from 'meteor/mongo';
import moment from 'moment';
Expand Down Expand Up @@ -65,24 +66,54 @@ export class AutoCloseOnHoldSchedulerClass {
await this.scheduler.cancel({ name: jobName });
}

private async executeJob({ attrs: { data } }: any = {}): Promise<void> {
this.logger.debug({ msg: 'Executing job for room', roomId: data.roomId });
const { roomId, comment } = data;
private async executeJob({ attrs: { data, name } }: any = {}): Promise<void> {
const { insertedId } = await CronHistory.insertOne({
_id: Random.id(),
intendedAt: new Date(),
name,
startedAt: new Date(),
type: 'omnichannel',
});

const [room, user] = await Promise.all([LivechatRooms.findOneById(roomId), this.getSchedulerUser()]);
if (!room || !user) {
throw new Error(
`Unable to process AutoCloseOnHoldScheduler job because room or user not found for roomId: ${roomId} and userId: rocket.cat`,
);
}
try {
this.logger.debug({ msg: 'Executing job for room', roomId: data.roomId });
const { roomId, comment } = data;

const payload = {
room,
user,
comment,
};
const [room, user] = await Promise.all([LivechatRooms.findOneById(roomId), this.getSchedulerUser()]);
if (!room || !user) {
throw new Error(
`Unable to process AutoCloseOnHoldScheduler job because room or user not found for roomId: ${roomId} and userId: rocket.cat`,
);
}

await closeRoom(payload);
const payload = {
room,
user,
comment,
};

await closeRoom(payload);

await CronHistory.updateOne(
{ _id: insertedId },
{
$set: {
finishedAt: new Date(),
},
},
);
} catch (error: any) {
await CronHistory.updateOne(
{ _id: insertedId },
{
$set: {
finishedAt: new Date(),
error: error?.stack ? error.stack : error,
},
},
);
throw error;
}
}

private async getSchedulerUser(): Promise<IUser> {
Expand Down
Loading
Loading