diff --git a/apps/meteor/app/api/server/index.ts b/apps/meteor/app/api/server/index.ts index 5a6a6f06cbbab..1b0b6f9650cea 100644 --- a/apps/meteor/app/api/server/index.ts +++ b/apps/meteor/app/api/server/index.ts @@ -45,7 +45,7 @@ import './v1/mailer'; import './v1/teams'; import './v1/moderation'; import './v1/uploads'; - +import './v1/cronJobs'; // This has to come last so all endpoints are registered before generating the OpenAPI documentation import './default/openApi'; diff --git a/apps/meteor/app/api/server/v1/cronJobs.ts b/apps/meteor/app/api/server/v1/cronJobs.ts new file mode 100644 index 0000000000000..f2f5a9af18713 --- /dev/null +++ b/apps/meteor/app/api/server/v1/cronJobs.ts @@ -0,0 +1,232 @@ +import { CronJobsSvc } from '@rocket.chat/core-services'; +import type { ICronJobItem, ICronHistoryItem } from '@rocket.chat/core-typings'; +import { ajv, ajvQuery, validateUnauthorizedErrorResponse, validateBadRequestErrorResponse } from '@rocket.chat/rest-typings'; + +import type { ExtractRoutesFromAPI } from '../ApiClass'; +import { API } from '../api'; +import { getPaginationItems } from '../helpers/getPaginationItems'; + +const isCronJobsListParams = ajvQuery.compile<{ + offset?: number; + count?: number; +}>({ + type: 'object', + properties: { + offset: { type: 'number', nullable: true }, + count: { type: 'number', nullable: true }, + }, + additionalProperties: false, +}); + +const isCronJobsActionParams = ajv.compile<{ + jobName: string; +}>({ + type: 'object', + properties: { + jobName: { type: 'string' }, + }, + required: ['jobName'], + additionalProperties: false, +}); + +const isCronJobsHistoryParams = ajvQuery.compile<{ + jobName: string; + offset?: number; + count?: number; +}>({ + type: 'object', + properties: { + jobName: { type: 'string' }, + offset: { type: 'number' }, + count: { type: 'number' }, + }, + required: ['jobName'], + additionalProperties: false, +}); + +const isCronJobsListResponse = ajv.compile<{ jobs: ICronJobItem[]; count: number; offset: number; total: number; success: boolean }>({ + type: 'object', + properties: { + jobs: { type: 'array' }, + count: { type: 'number' }, + offset: { type: 'number' }, + total: { type: 'number' }, + success: { type: 'boolean', enum: [true] }, + }, + required: ['jobs', 'count', 'offset', 'total', 'success'], + additionalProperties: false, +}); + +const isCronJobsHistoryResponse = ajv.compile<{ + history: ICronHistoryItem[]; + count: number; + offset: number; + total: number; + success: boolean; +}>({ + type: 'object', + properties: { + history: { type: 'array' }, + count: { type: 'number' }, + offset: { type: 'number' }, + total: { type: 'number' }, + success: { type: 'boolean', enum: [true] }, + }, + required: ['history', 'count', 'offset', 'total', 'success'], + additionalProperties: false, +}); + +const isCronJobsActionResponse = ajv.compile({ + type: 'object', + properties: { + success: { type: 'boolean', enum: [true] }, + }, + required: ['success'], + additionalProperties: false, +}); + +const cronJobsEndpoints = API.v1 + .get( + 'cron.jobs', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + query: isCronJobsListParams, + response: { + 200: isCronJobsListResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { offset, count } = await getPaginationItems(this.queryParams); + const { jobs, total } = await CronJobsSvc.getCoreJobs({ offset, count }); + + return API.v1.success({ + jobs, + count: jobs.length, + offset, + total, + }); + }, + ) + .get( + 'cron.appjobs', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + query: isCronJobsListParams, + response: { + 200: isCronJobsListResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { offset, count } = await getPaginationItems(this.queryParams); + const { jobs, total } = await CronJobsSvc.getAppJobs({ offset, count }); + + return API.v1.success({ + jobs, + count: jobs.length, + offset, + total, + }); + }, + ) + .get( + 'cron.history', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + query: isCronJobsHistoryParams, + response: { + 200: isCronJobsHistoryResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { offset, count } = await getPaginationItems(this.queryParams); + const { jobName } = this.queryParams; + const { history, total } = await CronJobsSvc.getHistory(jobName, { offset, count }); + + return API.v1.success({ + history, + count: history.length, + offset, + total, + }); + }, + ) + .post( + 'cron.enable', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + body: isCronJobsActionParams, + response: { + 200: isCronJobsActionResponse, + 400: validateBadRequestErrorResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { jobName } = this.bodyParams; + const success = await CronJobsSvc.enable(jobName); + + if (!success) { + return API.v1.failure('error-job-not-found'); + } + return API.v1.success(); + }, + ) + .post( + 'cron.disable', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + body: isCronJobsActionParams, + response: { + 200: isCronJobsActionResponse, + 400: validateBadRequestErrorResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { jobName } = this.bodyParams; + const success = await CronJobsSvc.disable(jobName); + + if (!success) { + return API.v1.failure('error-job-not-found'); + } + return API.v1.success(); + }, + ) + .post( + 'cron.trigger', + { + authRequired: true, + permissionsRequired: ['manage-scheduled-jobs'], + body: isCronJobsActionParams, + response: { + 200: isCronJobsActionResponse, + 400: validateBadRequestErrorResponse, + 401: validateUnauthorizedErrorResponse, + }, + }, + async function action() { + const { jobName } = this.bodyParams; + const success = await CronJobsSvc.trigger(jobName); + + if (!success) { + return API.v1.failure('error-job-not-found'); + } + + return API.v1.success(); + }, + ); + +export type CronJobsEndpoints = ExtractRoutesFromAPI; + +declare module '@rocket.chat/rest-typings' { + // eslint-disable-next-line @typescript-eslint/naming-convention, @typescript-eslint/no-empty-interface + interface Endpoints extends CronJobsEndpoints {} +} diff --git a/apps/meteor/app/apps/server/bridges/scheduler.ts b/apps/meteor/app/apps/server/bridges/scheduler.ts index 8f044cf1832f9..b30d81ab7bd93 100644 --- a/apps/meteor/app/apps/server/bridges/scheduler.ts +++ b/apps/meteor/app/apps/server/bridges/scheduler.ts @@ -4,11 +4,13 @@ import type { IAppServerOrchestrator } from '@rocket.chat/apps'; import { SchedulerBridge } from '@rocket.chat/apps/dist/server/bridges/SchedulerBridge'; import type { IProcessor, IOnetimeSchedule, IRecurringSchedule, IJobContext } from '@rocket.chat/apps-engine/definition/scheduler'; import { StartupType } from '@rocket.chat/apps-engine/definition/scheduler'; +import { CronHistory } from '@rocket.chat/models'; +import { Random } from '@rocket.chat/random'; import { ObjectId } from 'bson'; import { MongoInternals } from 'meteor/mongo'; function _callProcessor(processor: IProcessor['processor']): (job: Job) => Promise { - return (job) => { + return async (job) => { const data = job?.attrs?.data || {}; // This field is for internal use, no need to leak to app processor @@ -16,13 +18,43 @@ function _callProcessor(processor: IProcessor['processor']): (job: Job) => Promi data.jobId = job.attrs._id.toString(); - return (processor as (jobContext: IJobContext) => Promise)(data).then(async () => { + const { insertedId } = await CronHistory.insertOne({ + _id: Random.id(), + intendedAt: new Date(), + name: job.attrs.name, + startedAt: new Date(), + type: 'app', + }); + + try { + await (processor as (jobContext: IJobContext) => Promise)(data); + + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + }, + }, + ); + // ensure the 'normal' ('onetime' in our vocab) type job is removed after it is run // as Agenda does not remove it from the DB if (job.attrs.type === 'normal') { await job.agenda.cancel({ _id: job.attrs._id }); } - }); + } catch (error: any) { + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + error: error?.stack ? error.stack : error, + }, + }, + ); + throw error; + } }; } diff --git a/apps/meteor/app/authorization/server/constant/permissions.ts b/apps/meteor/app/authorization/server/constant/permissions.ts index c54f84c03fac2..64280d02acb20 100644 --- a/apps/meteor/app/authorization/server/constant/permissions.ts +++ b/apps/meteor/app/authorization/server/constant/permissions.ts @@ -84,6 +84,7 @@ export const permissions = [ { _id: 'view-privileged-setting', roles: ['admin'] }, { _id: 'view-room-administration', roles: ['admin'] }, { _id: 'view-statistics', roles: ['admin'] }, + { _id: 'manage-scheduled-jobs', roles: ['admin'] }, { _id: 'view-user-administration', roles: ['admin'] }, { _id: 'preview-c-room', roles: ['admin', 'user', 'federated-external', 'anonymous'] }, { _id: 'view-outside-room', roles: ['admin', 'owner', 'moderator', 'user', 'federated-external'] }, diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoCloseOnHoldScheduler.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoCloseOnHoldScheduler.ts index 82a80e411120f..19a99e8d996c6 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoCloseOnHoldScheduler.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoCloseOnHoldScheduler.ts @@ -1,7 +1,8 @@ import { Agenda } from '@rocket.chat/agenda'; import type { IUser } from '@rocket.chat/core-typings'; import type { MainLogger } from '@rocket.chat/logger'; -import { LivechatRooms, Users } from '@rocket.chat/models'; +import { LivechatRooms, Users, CronHistory } from '@rocket.chat/models'; +import { Random } from '@rocket.chat/random'; import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; import moment from 'moment'; @@ -65,24 +66,54 @@ export class AutoCloseOnHoldSchedulerClass { await this.scheduler.cancel({ name: jobName }); } - private async executeJob({ attrs: { data } }: any = {}): Promise { - this.logger.debug({ msg: 'Executing job for room', roomId: data.roomId }); - const { roomId, comment } = data; + private async executeJob({ attrs: { data, name } }: any = {}): Promise { + const { insertedId } = await CronHistory.insertOne({ + _id: Random.id(), + intendedAt: new Date(), + name, + startedAt: new Date(), + type: 'omnichannel', + }); - const [room, user] = await Promise.all([LivechatRooms.findOneById(roomId), this.getSchedulerUser()]); - if (!room || !user) { - throw new Error( - `Unable to process AutoCloseOnHoldScheduler job because room or user not found for roomId: ${roomId} and userId: rocket.cat`, - ); - } + try { + this.logger.debug({ msg: 'Executing job for room', roomId: data.roomId }); + const { roomId, comment } = data; - const payload = { - room, - user, - comment, - }; + const [room, user] = await Promise.all([LivechatRooms.findOneById(roomId), this.getSchedulerUser()]); + if (!room || !user) { + throw new Error( + `Unable to process AutoCloseOnHoldScheduler job because room or user not found for roomId: ${roomId} and userId: rocket.cat`, + ); + } - await closeRoom(payload); + const payload = { + room, + user, + comment, + }; + + await closeRoom(payload); + + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + }, + }, + ); + } catch (error: any) { + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + error: error?.stack ? error.stack : error, + }, + }, + ); + throw error; + } } private async getSchedulerUser(): Promise { diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoTransferChatScheduler.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoTransferChatScheduler.ts index c6c39c4512030..c0cd2557c7bff 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoTransferChatScheduler.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/AutoTransferChatScheduler.ts @@ -1,7 +1,8 @@ import { Agenda } from '@rocket.chat/agenda'; import type { IUser } from '@rocket.chat/core-typings'; import type { MainLogger } from '@rocket.chat/logger'; -import { LivechatRooms, Users } from '@rocket.chat/models'; +import { LivechatRooms, Users, CronHistory } from '@rocket.chat/models'; +import { Random } from '@rocket.chat/random'; import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; @@ -130,15 +131,38 @@ export class AutoTransferChatSchedulerClass { }); } - private async executeJob({ attrs: { data } }: any = {}): Promise { + private async executeJob({ attrs: { data, name } }: any = {}): Promise { const { roomId } = data; - + const { insertedId } = await CronHistory.insertOne({ + _id: Random.id(), + intendedAt: new Date(), + name, + startedAt: new Date(), + type: 'omnichannel', + }); try { await this.transferRoom(roomId); await Promise.all([LivechatRooms.setAutoTransferredAtById(roomId), this.unscheduleRoom(roomId)]); - } catch (error) { + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + }, + }, + ); + } catch (error: any) { this.logger.error({ msg: 'Error while executing auto-transfer job', schedulerName: SCHEDULER_NAME, roomId, err: error }); + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + error: error?.stack ? error.stack : error, + }, + }, + ); } } } diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/QueueInactivityMonitor.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/QueueInactivityMonitor.ts index 5877f0ea8b01b..df5415ac1c3a0 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/QueueInactivityMonitor.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/QueueInactivityMonitor.ts @@ -1,7 +1,8 @@ import { Agenda } from '@rocket.chat/agenda'; import type { IUser, IOmnichannelRoom } from '@rocket.chat/core-typings'; import type { MainLogger } from '@rocket.chat/logger'; -import { LivechatRooms, LivechatInquiry as LivechatInquiryRaw, Users } from '@rocket.chat/models'; +import { LivechatRooms, LivechatInquiry as LivechatInquiryRaw, Users, CronHistory } from '@rocket.chat/models'; +import { Random } from '@rocket.chat/random'; import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; import type { Db } from 'mongodb'; @@ -108,22 +109,47 @@ export class OmnichannelQueueInactivityMonitorClass { }); } - async closeRoom({ attrs: { data } }: any = {}): Promise { + async closeRoom({ attrs: { data, name } }: any = {}): Promise { const { inquiryId } = data; - // TODO: add projection and maybe use findOneQueued to avoid fetching the whole inquiry - const inquiry = await LivechatInquiryRaw.findOneById(inquiryId); - if (inquiry?.status !== 'queued') { - return; - } - const room = await LivechatRooms.findOneById(inquiry.rid); - if (!room) { - this.logger.error({ msg: 'Unable to find room to close in queue inactivity monitor', inquiryId, roomId: inquiry.rid }); - return; + const { insertedId } = await CronHistory.insertOne({ + _id: Random.id(), + intendedAt: new Date(), + name, + startedAt: new Date(), + type: 'omnichannel', + }); + // TODO: add projection and maybe use findOneQueued to avoid fetching the whole inquiry + try { + const inquiry = await LivechatInquiryRaw.findOneById(inquiryId); + if (inquiry?.status !== 'queued') { + await CronHistory.updateOne({ _id: insertedId }, { $set: { finishedAt: new Date() } }); + return; + } + + const room = await LivechatRooms.findOneById(inquiry.rid); + if (!room) { + this.logger.error({ msg: 'Unable to find room to close in queue inactivity monitor', inquiryId, roomId: inquiry.rid }); + await CronHistory.updateOne({ _id: insertedId }, { $set: { finishedAt: new Date() } }); + return; + } + + await Promise.all([this.closeRoomAction(room), this.stopInquiry(inquiryId)]); + this.logger.info({ msg: 'Closed room due to queue inactivity', roomId: inquiry.rid, inquiryId }); + + await CronHistory.updateOne({ _id: insertedId }, { $set: { finishedAt: new Date() } }); + } catch (error: any) { + await CronHistory.updateOne( + { _id: insertedId }, + { + $set: { + finishedAt: new Date(), + error: error?.stack ? error.stack : error, + }, + }, + ); + throw error; } - - await Promise.all([this.closeRoomAction(room), this.stopInquiry(inquiryId)]); - this.logger.info({ msg: 'Closed room due to queue inactivity', roomId: inquiry.rid, inquiryId }); } } diff --git a/apps/meteor/server/models.ts b/apps/meteor/server/models.ts index 49b5b98b19f08..039dfd5c36950 100644 --- a/apps/meteor/server/models.ts +++ b/apps/meteor/server/models.ts @@ -11,6 +11,7 @@ import { CallHistoryRaw, CredentialTokensRaw, CronHistoryRaw, + CronJobsRaw, CustomSoundsRaw, CustomUserStatusRaw, EmailInboxRaw, @@ -92,6 +93,7 @@ registerModel('ICalendarEventModel', new CalendarEventRaw(db)); registerModel('ICallHistoryModel', new CallHistoryRaw(db)); registerModel('ICredentialTokensModel', new CredentialTokensRaw(db)); registerModel('ICronHistoryModel', new CronHistoryRaw(db)); +registerModel('ICronJobsModel', new CronJobsRaw(db)); registerModel('ICustomSoundsModel', new CustomSoundsRaw(db)); registerModel('ICustomUserStatusModel', new CustomUserStatusRaw(db)); registerModel('IEmailInboxModel', new EmailInboxRaw(db)); diff --git a/apps/meteor/server/services/cron-jobs/deriveStatus.ts b/apps/meteor/server/services/cron-jobs/deriveStatus.ts new file mode 100644 index 0000000000000..b79307cc3ddff --- /dev/null +++ b/apps/meteor/server/services/cron-jobs/deriveStatus.ts @@ -0,0 +1,37 @@ +import type { ICronJobItem } from '@rocket.chat/core-typings'; + +export type CronJobStatus = 'running' | 'scheduled' | 'failed' | 'disabled' | 'completed'; + +const DEFAULT_LOCK_LIFETIME_MS = 10 * 60 * 1000; // 10 minutes + +export function deriveStatus(job: ICronJobItem): CronJobStatus { + // 1. Disabled check + if (job.disabled) { + return 'disabled'; + } + + // 2. Running check (with stale lock detection) + if (job.lockedAt) { + const lockExpiry = new Date(job.lockedAt.getTime() + DEFAULT_LOCK_LIFETIME_MS); + if (lockExpiry > new Date()) { + return 'running'; + } + // Lock is stale — worker crashed, not truly running + } + + // 3. Failed check (compare failedAt vs lastFinishedAt) + if (job.failCount && job.failCount > 0 && job.failedAt) { + if (!job.lastFinishedAt || job.failedAt > job.lastFinishedAt) { + return 'failed'; + } + // lastFinishedAt > failedAt means most recent run succeeded + } + + // 4. Scheduled check + if (job.nextRunAt) { + return 'scheduled'; + } + + // 5. Default + return 'completed'; +} diff --git a/apps/meteor/server/services/cron-jobs/service.ts b/apps/meteor/server/services/cron-jobs/service.ts new file mode 100644 index 0000000000000..d702112469765 --- /dev/null +++ b/apps/meteor/server/services/cron-jobs/service.ts @@ -0,0 +1,105 @@ +import { ServiceClassInternal } from '@rocket.chat/core-services'; +import type { ICronJobsService } from '@rocket.chat/core-services'; +import type { ICronJobItem, ICronHistoryItem } from '@rocket.chat/core-typings'; +import { cronJobs } from '@rocket.chat/cron'; +import { CronJobs, CronHistory } from '@rocket.chat/models'; + +import { deriveStatus } from './deriveStatus'; + +export class CronJobsService extends ServiceClassInternal implements ICronJobsService { + protected name = 'cron-jobs'; + + async getCoreJobs(pagination?: { + offset?: number; + count?: number; + }): Promise<{ jobs: ICronJobItem[]; count: number; offset: number; total: number }> { + const { cursor, totalCount } = CronJobs.findPaginated( + { + name: { $not: /^Apps-/ }, + }, + { + skip: pagination?.offset, + limit: pagination?.count, + }, + ); + + const [allJobs, total] = await Promise.all([cursor.toArray(), totalCount]); + + const jobs = allJobs.map((job) => ({ + ...job, + status: job.status ?? deriveStatus(job), + })); + + return { + jobs, + count: jobs.length, + offset: pagination?.offset || 0, + total, + }; + } + + async getAppJobs(pagination?: { + offset?: number; + count?: number; + }): Promise<{ jobs: ICronJobItem[]; count: number; offset: number; total: number }> { + const { cursor, totalCount } = CronJobs.findPaginated( + { + name: /^Apps-/, + }, + { + skip: pagination?.offset, + limit: pagination?.count, + }, + ); + + const [allJobs, total] = await Promise.all([cursor.toArray(), totalCount]); + + const jobs = allJobs.map((job) => ({ + ...job, + status: job.status ?? deriveStatus(job), + })); + + return { + jobs, + count: jobs.length, + offset: pagination?.offset || 0, + total, + }; + } + + async getHistory( + jobName: string, + pagination?: { offset?: number; count?: number }, + ): Promise<{ history: ICronHistoryItem[]; count: number; offset: number; total: number }> { + const { cursor, totalCount } = CronHistory.findPaginated( + { + name: jobName, + }, + { + sort: { intendedAt: -1 }, + skip: pagination?.offset, + limit: pagination?.count, + }, + ); + const [history, total] = await Promise.all([cursor.toArray(), totalCount]); + + return { + history, + count: history.length, + offset: pagination?.offset || 0, + total, + }; + } + + async enable(jobName: string): Promise { + return cronJobs.enable(jobName); + } + + async disable(jobName: string): Promise { + return cronJobs.disable(jobName); + } + + async trigger(jobName: string): Promise { + return cronJobs.trigger(jobName); + } +} diff --git a/apps/meteor/server/services/startup.ts b/apps/meteor/server/services/startup.ts index b0c14c2b49913..2366ef489d019 100644 --- a/apps/meteor/server/services/startup.ts +++ b/apps/meteor/server/services/startup.ts @@ -10,6 +10,7 @@ import { AppsEngineService } from './apps-engine/service'; import { BannerService } from './banner/service'; import { CalendarService } from './calendar/service'; import { CallHistoryService } from './call-history/service'; +import { CronJobsService } from './cron-jobs/service'; import { DeviceManagementService } from './device-management/service'; import { MediaService } from './image/service'; import { ImportService } from './import/service'; @@ -61,6 +62,7 @@ export const registerServices = async (): Promise => { api.registerService(new UserService()); api.registerService(new MediaCallService()); api.registerService(new CallHistoryService()); + api.registerService(new CronJobsService()); // if the process is running in micro services mode we don't need to register services that will run separately if (!isRunningMs()) { diff --git a/apps/meteor/tests/end-to-end/api/cron-jobs.ts b/apps/meteor/tests/end-to-end/api/cron-jobs.ts new file mode 100644 index 0000000000000..b05143915ea55 --- /dev/null +++ b/apps/meteor/tests/end-to-end/api/cron-jobs.ts @@ -0,0 +1,204 @@ +import { expect } from 'chai'; +import { before, describe, it, after } from 'mocha'; + +import { getCredentials, api, request, credentials } from '../../data/api-data'; +import { updatePermission } from '../../data/permissions.helper'; + +describe('[Cron Jobs API]', () => { + before((done) => getCredentials(done)); + + after(async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + }); + + describe('[/cron.jobs]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.get(api('cron.jobs')).expect(401); + }); + + it('should return a 403 error when the user does not have the manage-scheduled-jobs permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request + .get(api('cron.jobs')) + .set(credentials) + .expect(403) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.error).to.include('error-unauthorized'); + }); + }); + + it('should return an array of core jobs when the user has the permission', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .get(api('cron.jobs')) + .set(credentials) + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('jobs').and.to.be.an('array'); + expect(res.body).to.have.property('offset'); + expect(res.body).to.have.property('total'); + expect(res.body).to.have.property('count'); + }); + }); + + it('should return paginated core jobs when requested with count and offset params', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .get(api('cron.jobs')) + .set(credentials) + .query({ count: 5, offset: 0 }) + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('jobs').and.to.be.an('array'); + expect(res.body.offset).to.equal(0); + }); + }); + }); + + describe('[/cron.appjobs]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.get(api('cron.appjobs')).expect(401); + }); + + it('should return a 403 error when the user does not have the manage-scheduled-jobs permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request.get(api('cron.appjobs')).set(credentials).expect(403); + }); + + it('should return an array of app jobs when the user has the permission', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .get(api('cron.appjobs')) + .set(credentials) + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('jobs').and.to.be.an('array'); + }); + }); + }); + + describe('[/cron.history]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.get(api('cron.history')).expect(401); + }); + + it('should return a 403 error when the user does not have the necessary permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request.get(api('cron.history')).set(credentials).query({ jobName: 'NPS' }).expect(403); + }); + + it('should return a 400 invalid-params error when jobName query parameter is missing', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .get(api('cron.history')) + .set(credentials) + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body).to.have.property('errorType', 'invalid-params'); + }); + }); + + it('should return an array with the history logs when permission is granted and jobName is provided', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .get(api('cron.history')) + .set(credentials) + .query({ jobName: 'NPS' }) + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('history').and.to.be.an('array'); + }); + }); + }); + + describe('[/cron.trigger]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.post(api('cron.trigger')).send({ jobName: 'test' }).expect(401); + }); + + it('should return a 403 error when missing permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request.post(api('cron.trigger')).set(credentials).send({ jobName: 'invalid-job' }).expect(403); + }); + + it('should return a 400 invalid-params error when jobName is missing in the body', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .post(api('cron.trigger')) + .set(credentials) + .send({}) + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body).to.have.property('errorType', 'invalid-params'); + }); + }); + + it('should return error-job-not-found when trying to trigger a non-existent job', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .post(api('cron.trigger')) + .set(credentials) + .send({ jobName: 'invalid-job' }) + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.error).to.equal('error-job-not-found'); + }); + }); + }); + + describe('[/cron.enable]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.post(api('cron.enable')).send({ jobName: 'test' }).expect(401); + }); + + it('should return a 403 error when missing permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request.post(api('cron.enable')).set(credentials).send({ jobName: 'invalid-job' }).expect(403); + }); + + it('should return error-job-not-found when trying to enable a non-existent job', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .post(api('cron.enable')) + .set(credentials) + .send({ jobName: 'invalid-job' }) + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.error).to.equal('error-job-not-found'); + }); + }); + }); + + describe('[/cron.disable]', () => { + it('should return 401 when the user is not authenticated', async () => { + await request.post(api('cron.disable')).send({ jobName: 'test' }).expect(401); + }); + + it('should return a 403 error when missing permission', async () => { + await updatePermission('manage-scheduled-jobs', []); + await request.post(api('cron.disable')).set(credentials).send({ jobName: 'invalid-job' }).expect(403); + }); + + it('should return error-job-not-found when trying to disable a non-existent job', async () => { + await updatePermission('manage-scheduled-jobs', ['admin']); + await request + .post(api('cron.disable')) + .set(credentials) + .send({ jobName: 'invalid-job' }) + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.error).to.equal('error-job-not-found'); + }); + }); + }); +}); diff --git a/apps/meteor/tests/unit/server/services/cron-jobs/deriveStatus.spec.ts b/apps/meteor/tests/unit/server/services/cron-jobs/deriveStatus.spec.ts new file mode 100644 index 0000000000000..f465646c1879e --- /dev/null +++ b/apps/meteor/tests/unit/server/services/cron-jobs/deriveStatus.spec.ts @@ -0,0 +1,48 @@ +import { expect } from 'chai'; + +import { deriveStatus } from '../../../../../server/services/cron-jobs/deriveStatus'; + +describe('CronJobs deriveStatus', () => { + it('should return "disabled" if job.disabled is true', () => { + const result = deriveStatus({ _id: '1', disabled: true } as any); + expect(result).to.be.equal('disabled'); + }); + + it('should return "running" if job has a fresh lockedAt date', () => { + const freshLock = new Date(Date.now() - 5 * 60 * 1000); + const result = deriveStatus({ _id: '1', lockedAt: freshLock } as any); + expect(result).to.be.equal('running'); + }); + + it('should ignore stale locks and fall back to "scheduled" if nextRunAt exists', () => { + const staleLock = new Date(Date.now() - 15 * 60 * 1000); + const result = deriveStatus({ _id: '1', lockedAt: staleLock, nextRunAt: new Date() } as any); + expect(result).to.be.equal('scheduled'); + }); + + it('should return "failed" if the job failed more recently than it finished', () => { + const result = deriveStatus({ + _id: '1', + failCount: 1, + failedAt: new Date('2026-06-08T12:00:00Z'), + lastFinishedAt: new Date('2026-06-08T11:00:00Z'), + } as any); + expect(result).to.be.equal('failed'); + }); + + it('should return "scheduled" if the job recovered from a failure and has a nextRunAt date', () => { + const result = deriveStatus({ + _id: '1', + failCount: 1, + failedAt: new Date('2026-06-08T11:00:00Z'), + lastFinishedAt: new Date('2026-06-08T12:00:00Z'), + nextRunAt: new Date(), + } as any); + expect(result).to.be.equal('scheduled'); + }); + + it('should return "completed" if it is not disabled, running, scheduled, or failed', () => { + const result = deriveStatus({ _id: '1', lastFinishedAt: new Date() } as any); + expect(result).to.be.equal('completed'); + }); +}); diff --git a/packages/agenda/src/definition/IJob.ts b/packages/agenda/src/definition/IJob.ts index df02f5d0fdf35..ea3e4d4ee8620 100644 --- a/packages/agenda/src/definition/IJob.ts +++ b/packages/agenda/src/definition/IJob.ts @@ -1,5 +1,6 @@ export interface IJob { name: string; + status?: string; nextRunAt?: Date | null; type?: 'once' | 'single' | 'normal'; diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index cb76365e2d18e..bc9b6d6d5f32b 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -9,6 +9,7 @@ import type { IAuthorizationLivechat } from './types/IAuthorizationLivechat'; import type { IBannerService } from './types/IBannerService'; import type { ICalendarService } from './types/ICalendarService'; import type { ICallHistoryService } from './types/ICallHistoryService'; +import type { ICronJobsService } from './types/ICronJobsService'; import type { IDeviceManagementService } from './types/IDeviceManagementService'; import type { IEnterpriseSettings } from './types/IEnterpriseSettings'; import type { IFederationMatrixService } from './types/IFederationMatrixService'; @@ -155,6 +156,7 @@ export type { IImportService, IOmnichannelAnalyticsService, IUserService, + ICronJobsService, }; export { proxify }; @@ -192,6 +194,7 @@ export const Omnichannel = proxify('omnichannel'); export const OmnichannelEEService = proxify('omnichannel-ee'); export const Import = proxify('import'); export const OmnichannelAnalytics = proxify('omnichannel-analytics'); +export const CronJobsSvc = proxify('cron-jobs'); export const User = proxify('user'); export const Push = proxify('push'); diff --git a/packages/core-services/src/types/ICronJobsService.ts b/packages/core-services/src/types/ICronJobsService.ts new file mode 100644 index 0000000000000..50231733c1ca6 --- /dev/null +++ b/packages/core-services/src/types/ICronJobsService.ts @@ -0,0 +1,14 @@ +import type { ICronJobItem, ICronHistoryItem } from '@rocket.chat/core-typings'; +import type { PaginatedResult } from '@rocket.chat/rest-typings'; + +import type { IServiceClass } from './ServiceClass'; + +export interface ICronJobsService extends IServiceClass { + getCoreJobs(pagination?: { offset?: number; count?: number }): Promise>; + getAppJobs(pagination?: { offset?: number; count?: number }): Promise>; + getHistory(jobName: string, pagination?: { offset?: number; count?: number }): Promise>; + + enable(jobName: string): Promise; + disable(jobName: string): Promise; + trigger(jobName: string): Promise; +} diff --git a/packages/core-typings/src/ICronHistoryItem.ts b/packages/core-typings/src/ICronHistoryItem.ts index c269b57a8ae30..da476fe348a60 100644 --- a/packages/core-typings/src/ICronHistoryItem.ts +++ b/packages/core-typings/src/ICronHistoryItem.ts @@ -7,4 +7,5 @@ export interface ICronHistoryItem extends IRocketChatRecord { finishedAt?: Date; result?: any; error?: any; + type?: 'system' | 'app' | 'omnichannel'; } diff --git a/packages/core-typings/src/ICronJobItem.ts b/packages/core-typings/src/ICronJobItem.ts new file mode 100644 index 0000000000000..8d59604f9cbaf --- /dev/null +++ b/packages/core-typings/src/ICronJobItem.ts @@ -0,0 +1,19 @@ +import type { IRocketChatRecord } from './IRocketChatRecord'; + +export interface ICronJobItem extends IRocketChatRecord { + name: string; + type?: 'once' | 'single' | 'normal'; + nextRunAt?: Date | null; + repeatInterval?: string | number; + repeatTimezone?: string | null; + lastRunAt?: Date; + lastFinishedAt?: Date; + failedAt?: Date; + lockedAt?: Date | null; + disabled?: boolean; + failReason?: string; + failCount?: number; + lastModifiedBy?: string; + data?: Record; + status?: string; +} diff --git a/packages/core-typings/src/index.ts b/packages/core-typings/src/index.ts index 90dd9c4e638a8..38dcd45affa52 100644 --- a/packages/core-typings/src/index.ts +++ b/packages/core-typings/src/index.ts @@ -115,6 +115,7 @@ export type * from './search'; export * from './omnichannel'; export type * from './ILivechatUnitMonitor'; export type * from './ICronHistoryItem'; +export type * from './ICronJobItem'; export type * from './migrations/IControl'; export type * from './OauthConfig'; diff --git a/packages/cron/src/index.ts b/packages/cron/src/index.ts index ed653a800ee41..0a79efb5d5009 100644 --- a/packages/cron/src/index.ts +++ b/packages/cron/src/index.ts @@ -1,6 +1,6 @@ import { type Job, Agenda } from '@rocket.chat/agenda'; import { Logger } from '@rocket.chat/logger'; -import { CronHistory } from '@rocket.chat/models'; +import { CronHistory, CronJobs } from '@rocket.chat/models'; import { Random } from '@rocket.chat/random'; import type { Db } from 'mongodb'; @@ -12,6 +12,7 @@ const runCronJobFunctionAndPersistResult = async (fn: () => Promise, jobNam intendedAt: new Date(), name: jobName, startedAt: new Date(), + type: 'system', }); try { const result = await fn(); @@ -77,6 +78,7 @@ export class AgendaCronJobs { jobName: job.attrs.name, nextRunAt: job.attrs.nextRunAt, }); + void CronJobs.updateOne({ _id: job.attrs._id }, { $set: { status: 'running' } }); }); this.scheduler.on('complete', (job: Job) => { @@ -89,6 +91,7 @@ export class AgendaCronJobs { duration: job.attrs.lastFinishedAt && job.attrs.lastRunAt ? job.attrs.lastFinishedAt.getTime() - job.attrs.lastRunAt.getTime() : undefined, }); + void CronJobs.updateOne({ _id: job.attrs._id }, { $set: { status: 'completed' } }); }); this.scheduler.on('success', (job: Job) => { @@ -108,6 +111,7 @@ export class AgendaCronJobs { failCount: job.attrs.failCount, failReason: job.attrs.failReason, }); + void CronJobs.updateOne({ _id: job.attrs._id }, { $set: { status: 'failed' } }); }); this.scheduler.on('error:database', (err: unknown) => { @@ -178,6 +182,55 @@ export class AgendaCronJobs { return this.scheduler.has({ name: jobName }); } + public async enable(jobName: string): Promise { + if (!this.scheduler) { + return false; + } + + const jobs = await this.scheduler.jobs({ name: jobName }); + + if (!jobs.length) { + return false; + } + + const job = jobs[0]; + job.enable(); + await job.save(); + + return true; + } + + public async disable(jobName: string): Promise { + if (!this.scheduler) { + return false; + } + + const jobs = await this.scheduler.jobs({ name: jobName }); + if (!jobs.length) { + return false; + } + + const job = jobs[0]; + job.disable(); + await job.save(); + + return true; + } + + public async trigger(jobName: string): Promise { + if (!this.scheduler) { + return false; + } + const jobs = await this.scheduler.jobs({ name: jobName }); + if (!jobs.length) { + return false; + } + + await this.scheduler.now(jobName, {}); + + return true; + } + private async reserve(config: ReservedJob): Promise { this.reservedJobs = [...this.reservedJobs, config]; } diff --git a/packages/i18n/src/locales/en.i18n.json b/packages/i18n/src/locales/en.i18n.json index 921b3805a316b..e8226ab6dc2a9 100644 --- a/packages/i18n/src/locales/en.i18n.json +++ b/packages/i18n/src/locales/en.i18n.json @@ -7176,6 +7176,9 @@ "view-room-administration_description": "Permission to view public, private and direct message statistics. Does not include the ability to view conversations or archives", "view-statistics": "View Statistics", "view-statistics_description": "Permission to view system statistics such as number of users logged in, number of rooms, operating system information", + "manage-scheduled-jobs": "Manage Scheduled Jobs", + "manage-scheduled-jobs_description": "Permission to view, enable, disable, and trigger system and omnichannel background jobs", + "error-job-not-found": "The requested background job could not be found.", "view-user-administration": "View User Administration", "view-user-administration_description": "Permission to partial, read-only list view of other user accounts currently logged into the system. No user account information is accessible with this permission", "webdav-account-saved": "WebDAV account saved", diff --git a/packages/model-typings/src/index.ts b/packages/model-typings/src/index.ts index 439447c49ea02..b0d61cf8af846 100644 --- a/packages/model-typings/src/index.ts +++ b/packages/model-typings/src/index.ts @@ -72,6 +72,7 @@ export type * from './models/IImportsModel'; export type * from './models/IFederationRoomEventsModel'; export type * from './models/IAuditLogModel'; export type * from './models/ICronHistoryModel'; +export type * from './models/ICronJobsModel'; export type * from './models/IMigrationsModel'; export type * from './models/IModerationReportsModel'; export type * from './models/IMediaCallsModel'; diff --git a/packages/model-typings/src/models/ICronJobsModel.ts b/packages/model-typings/src/models/ICronJobsModel.ts new file mode 100644 index 0000000000000..8aa1f5c7887ca --- /dev/null +++ b/packages/model-typings/src/models/ICronJobsModel.ts @@ -0,0 +1,5 @@ +import type { ICronJobItem } from '@rocket.chat/core-typings'; + +import type { IBaseModel } from './IBaseModel'; + +export type ICronJobsModel = IBaseModel; diff --git a/packages/models/src/index.ts b/packages/models/src/index.ts index 099420cd2f7ae..ee935fa5a4d76 100644 --- a/packages/models/src/index.ts +++ b/packages/models/src/index.ts @@ -79,6 +79,7 @@ import type { IMediaCallChannelsModel, IMediaCallNegotiationsModel, ICallHistoryModel, + ICronJobsModel, IAbacAttributesModel, } from '@rocket.chat/model-typings'; import type { Collection, Db } from 'mongodb'; @@ -106,6 +107,7 @@ import { UsersSessionsRaw, AbacAttributesRaw, ServerEventsRaw, + CronJobsRaw, } from './modelClasses'; import { proxify, registerModel } from './proxify'; @@ -203,6 +205,7 @@ export const OmnichannelServiceLevelAgreements = proxify('IAuditLogModel'); export const CronHistory = proxify('ICronHistoryModel'); +export const CronJobs = proxify('ICronJobsModel'); export const Migrations = proxify('IMigrationsModel'); export const ModerationReports = proxify('IModerationReportsModel'); export const WorkspaceCredentials = proxify('IWorkspaceCredentialsModel'); @@ -240,5 +243,6 @@ export function registerServiceModels(db: Db, trash?: Collection new UploadsRaw(db)); registerModel('ILivechatVisitorsModel', () => new LivechatVisitorsRaw(db)); registerModel('IAbacAttributesModel', () => new AbacAttributesRaw(db)); + registerModel('ICronJobsModel', () => new CronJobsRaw(db)); registerModel('IServerEventsModel', () => new ServerEventsRaw(db)); } diff --git a/packages/models/src/modelClasses.ts b/packages/models/src/modelClasses.ts index c554fc9698550..42c9841e2556e 100644 --- a/packages/models/src/modelClasses.ts +++ b/packages/models/src/modelClasses.ts @@ -65,6 +65,7 @@ export * from './models/WebdavAccounts'; export * from './models/CredentialTokens'; export * from './models/MessageReads'; export * from './models/CronHistoryModel'; +export * from './models/CronJobsModel'; export * from './models/Migrations'; export * from './models/ModerationReports'; export * from './models/MediaCalls'; diff --git a/packages/models/src/models/CronHistoryModel.ts b/packages/models/src/models/CronHistoryModel.ts index 905f62cfaab82..eac2b245f5312 100644 --- a/packages/models/src/models/CronHistoryModel.ts +++ b/packages/models/src/models/CronHistoryModel.ts @@ -10,6 +10,6 @@ export class CronHistoryRaw extends BaseRaw implements ICronHi } protected override modelIndexes(): IndexDescription[] { - return [{ key: { intendedAt: 1, name: 1 }, unique: true }]; + return [{ key: { name: 1, intendedAt: -1 } }]; } } diff --git a/packages/models/src/models/CronJobsModel.ts b/packages/models/src/models/CronJobsModel.ts new file mode 100644 index 0000000000000..a31287487d7a2 --- /dev/null +++ b/packages/models/src/models/CronJobsModel.ts @@ -0,0 +1,13 @@ +import type { ICronJobItem } from '@rocket.chat/core-typings'; +import type { ICronJobsModel } from '@rocket.chat/model-typings'; +import type { Db } from 'mongodb'; + +import { BaseRaw } from './BaseRaw'; + +export class CronJobsRaw extends BaseRaw implements ICronJobsModel { + constructor(db: Db) { + super(db, 'cron', undefined, { + preventSetUpdatedAt: true, + }); + } +}