From 76416b635242f0115f2f176c29b3ac151831081b Mon Sep 17 00:00:00 2001 From: Duansg Date: Wed, 17 Jun 2026 01:33:52 -0700 Subject: [PATCH] [fix] fixed an issue where cron jobs were not rescheduled --- .../scheduler/CollectorJobScheduler.java | 2 + .../manager/scheduler/SchedulerInit.java | 2 + .../service/impl/MonitorServiceImpl.java | 4 ++ .../scheduler/CollectorJobSchedulerTest.java | 43 +++++++++++++++++++ 4 files changed, 51 insertions(+) diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index 915b549e1f7..b9512f60ec6 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -149,6 +149,8 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { appDefine.setDefaultInterval(monitor.getIntervals()); appDefine.setCyclic(true); appDefine.setTimestamp(System.currentTimeMillis()); + appDefine.setScheduleType(monitor.getScheduleType()); + appDefine.setCronExpression(monitor.getCronExpression()); Map metadata = Map.of(CommonConstants.LABEL_INSTANCE_NAME, monitor.getName(), CommonConstants.LABEL_INSTANCE, monitor.getInstance()); appDefine.setMetadata(metadata); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java index c1c9bfa01c3..d91963c37c1 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java @@ -118,6 +118,8 @@ public void run(String... args) throws Exception { appDefine.setDefaultInterval(monitor.getIntervals()); appDefine.setCyclic(true); appDefine.setTimestamp(System.currentTimeMillis()); + appDefine.setScheduleType(monitor.getScheduleType()); + appDefine.setCronExpression(monitor.getCronExpression()); String instance = monitor.getInstance(); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java index 7e06ef78229..d9fe4b9638f 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java @@ -181,6 +181,8 @@ public void addMonitor(Monitor monitor, List params, String collector, Gr appDefine.setDefaultInterval(monitor.getIntervals()); appDefine.setCyclic(true); appDefine.setTimestamp(System.currentTimeMillis()); + appDefine.setScheduleType(monitor.getScheduleType()); + appDefine.setCronExpression(monitor.getCronExpression()); String instance = monitor.getInstance(); // The port field may be null @@ -780,6 +782,8 @@ public void updateAppCollectJob(Job job) { appDefine.setDefaultInterval(monitor.getIntervals()); appDefine.setCyclic(true); appDefine.setTimestamp(System.currentTimeMillis()); + appDefine.setScheduleType(monitor.getScheduleType()); + appDefine.setCronExpression(monitor.getCronExpression()); Map metadata = Map.of(CommonConstants.LABEL_INSTANCE_NAME, monitor.getName(), CommonConstants.LABEL_INSTANCE, monitor.getInstance()); appDefine.setMetadata(metadata); diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java index e4df4231cad..10cc1761427 100644 --- a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java +++ b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java @@ -148,4 +148,47 @@ public void testCollectorGoOnlineJobMetadataNotEmpty() { assertEquals("127.0.0.1:8080", job.getMetadata().get(CommonConstants.LABEL_INSTANCE)); } + /** + * Regression test for issue #4157: when a collector goes online the dispatched job must carry the + * monitor's cron schedule type and cron expression, otherwise the job falls back to interval + * scheduling and the cron expression is ignored after restart / collector reconnect. + */ + @Test + public void testCollectorGoOnlinePropagatesCronSchedule() { + String identity = "collector-1"; + CollectorInfo collectorInfo = CollectorInfo.builder().ip("127.0.0.1").mode("public").version("1.0.0").build(); + org.apache.hertzbeat.common.entity.manager.Collector collector = org.apache.hertzbeat.common.entity.manager.Collector.builder() + .name(identity).ip("127.0.0.1").mode("public").version("1.0.0").status(CommonConstants.COLLECTOR_STATUS_OFFLINE).build(); + when(collectorDao.findCollectorByName(identity)).thenReturn(Optional.of(collector)); + + CollectorMonitorBind bind = CollectorMonitorBind.builder().collector(identity).monitorId(1L).build(); + when(collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity)).thenReturn(List.of(bind)); + + Monitor monitor = Monitor.builder().id(1L).name("test-monitor").instance("127.0.0.1:8080").app("test-app") + .intervals(60).status((byte) 1).scheduleType("cron").cronExpression("0 55 7 * * ?").build(); + when(monitorDao.findMonitorsByIdIn(any())).thenReturn(List.of(monitor)); + + when(paramDao.findParamsByMonitorId(eq(monitor.getId()))).thenReturn(List.of()); + + Job appDefine = new Job(); + appDefine.setParams(Collections.emptyList()); + when(appService.getAppDefine(anyString())).thenReturn(appDefine); + + ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(), + collector.getIp(), System.currentTimeMillis(), null); + when(consistentHash.getNode("collector-1")).thenReturn(node); + + ManageServer manageServer = mock(ManageServer.class); + collectorJobScheduler.setManageServer(manageServer); + + collectorJobScheduler.collectorGoOnline(identity, collectorInfo); + + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(ClusterMsg.Message.class); + verify(manageServer, atLeastOnce()).sendMsg(eq("collector-1"), msgCaptor.capture()); + Job job = JsonUtil.fromJson(msgCaptor.getValue().getMsg().toStringUtf8(), Job.class); + assertNotNull(job); + assertEquals("cron", job.getScheduleType()); + assertEquals("0 55 7 * * ?", job.getCronExpression()); + } + }