Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) {
appDefine.setDefaultInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
appDefine.setScheduleType(monitor.getScheduleType());
appDefine.setCronExpression(monitor.getCronExpression());
Map<String, String> metadata = Map.of(CommonConstants.LABEL_INSTANCE_NAME, monitor.getName(),
CommonConstants.LABEL_INSTANCE, monitor.getInstance());
appDefine.setMetadata(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void run(String... args) throws Exception {
appDefine.setDefaultInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
appDefine.setScheduleType(monitor.getScheduleType());
appDefine.setCronExpression(monitor.getCronExpression());

String instance = monitor.getInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public void addMonitor(Monitor monitor, List<Param> params, String collector, Gr
appDefine.setDefaultInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
appDefine.setScheduleType(monitor.getScheduleType());
appDefine.setCronExpression(monitor.getCronExpression());

String instance = monitor.getInstance();
// The port field may be null
Expand Down Expand Up @@ -780,6 +782,8 @@ public void updateAppCollectJob(Job job) {
appDefine.setDefaultInterval(monitor.getIntervals());
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
appDefine.setScheduleType(monitor.getScheduleType());
appDefine.setCronExpression(monitor.getCronExpression());
Map<String, String> metadata = Map.of(CommonConstants.LABEL_INSTANCE_NAME, monitor.getName(),
CommonConstants.LABEL_INSTANCE, monitor.getInstance());
appDefine.setMetadata(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,47 @@ public void testCollectorGoOnlineJobMetadataNotEmpty() {
assertEquals("127.0.0.1:8080", job.getMetadata().get(CommonConstants.LABEL_INSTANCE));
}

/**
* Regression test for issue #4157: when a collector goes online the dispatched job must carry the
* monitor's cron schedule type and cron expression, otherwise the job falls back to interval
* scheduling and the cron expression is ignored after restart / collector reconnect.
*/
@Test
public void testCollectorGoOnlinePropagatesCronSchedule() {
String identity = "collector-1";
CollectorInfo collectorInfo = CollectorInfo.builder().ip("127.0.0.1").mode("public").version("1.0.0").build();
org.apache.hertzbeat.common.entity.manager.Collector collector = org.apache.hertzbeat.common.entity.manager.Collector.builder()
.name(identity).ip("127.0.0.1").mode("public").version("1.0.0").status(CommonConstants.COLLECTOR_STATUS_OFFLINE).build();
when(collectorDao.findCollectorByName(identity)).thenReturn(Optional.of(collector));

CollectorMonitorBind bind = CollectorMonitorBind.builder().collector(identity).monitorId(1L).build();
when(collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity)).thenReturn(List.of(bind));

Monitor monitor = Monitor.builder().id(1L).name("test-monitor").instance("127.0.0.1:8080").app("test-app")
.intervals(60).status((byte) 1).scheduleType("cron").cronExpression("0 55 7 * * ?").build();
when(monitorDao.findMonitorsByIdIn(any())).thenReturn(List.of(monitor));

when(paramDao.findParamsByMonitorId(eq(monitor.getId()))).thenReturn(List.of());

Job appDefine = new Job();
appDefine.setParams(Collections.emptyList());
when(appService.getAppDefine(anyString())).thenReturn(appDefine);

ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(),
collector.getIp(), System.currentTimeMillis(), null);
when(consistentHash.getNode("collector-1")).thenReturn(node);

ManageServer manageServer = mock(ManageServer.class);
collectorJobScheduler.setManageServer(manageServer);

collectorJobScheduler.collectorGoOnline(identity, collectorInfo);

ArgumentCaptor<ClusterMsg.Message> msgCaptor = ArgumentCaptor.forClass(ClusterMsg.Message.class);
verify(manageServer, atLeastOnce()).sendMsg(eq("collector-1"), msgCaptor.capture());
Job job = JsonUtil.fromJson(msgCaptor.getValue().getMsg().toStringUtf8(), Job.class);
assertNotNull(job);
assertEquals("cron", job.getScheduleType());
assertEquals("0 55 7 * * ?", job.getCronExpression());
}

}
Loading