From c4aba7aba4b3bd52a01303f6d6822ed288db533e Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Mon, 29 Jun 2026 21:33:59 +0800 Subject: [PATCH 1/2] [CELEBORN-2375] Support DecommissionThenIdle and Recommission via worker-local HTTP API Co-Authored-By: Claude Opus 4.8 (1M context) --- .../rest/v1/model/WorkerEventRequest.java | 143 ++++++++++++++++++ .../celeborn/rest/v1/worker/WorkerApi.java | 70 +++++++++ .../src/main/openapi3/worker_rest_v1.yaml | 35 +++++ .../celeborn/server/common/HttpService.scala | 2 + .../service/deploy/worker/Worker.scala | 16 ++ .../worker/http/api/v1/WorkerResource.scala | 21 ++- .../api/v1/ApiV1WorkerResourceSuite.scala | 41 ++++- 7 files changed, 326 insertions(+), 2 deletions(-) create mode 100644 openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java new file mode 100644 index 00000000000..8512fe8286e --- /dev/null +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.celeborn.rest.v1.model; + +import java.util.Objects; +import java.util.Arrays; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * WorkerEventRequest + */ +@JsonPropertyOrder({ + WorkerEventRequest.JSON_PROPERTY_EVENT_TYPE +}) +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") +public class WorkerEventRequest { + /** + * The type of the worker event. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + */ + public enum EventTypeEnum { + DECOMMISSIONTHENIDLE("DECOMMISSIONTHENIDLE"), + + RECOMMISSION("RECOMMISSION"); + + private String value; + + EventTypeEnum(String value) { + this.value = value; + } + + @JsonValue + public String getValue() { + return value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + @JsonCreator + public static EventTypeEnum fromValue(String value) { + for (EventTypeEnum b : EventTypeEnum.values()) { + if (b.value.equalsIgnoreCase(value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + } + + public static final String JSON_PROPERTY_EVENT_TYPE = "event_type"; + private EventTypeEnum eventType; + + public WorkerEventRequest() { + } + + public WorkerEventRequest eventType(EventTypeEnum eventType) { + + this.eventType = eventType; + return this; + } + + /** + * The type of the worker event. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @return eventType + */ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_EVENT_TYPE) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + + public EventTypeEnum getEventType() { + return eventType; + } + + + @JsonProperty(JSON_PROPERTY_EVENT_TYPE) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setEventType(EventTypeEnum eventType) { + this.eventType = eventType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerEventRequest workerEventRequest = (WorkerEventRequest) o; + return Objects.equals(this.eventType, workerEventRequest.eventType); + } + + @Override + public int hashCode() { + return Objects.hash(eventType); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class WorkerEventRequest {\n"); + sb.append(" eventType: ").append(toIndentedString(eventType)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java index 0cce44ea03e..48fc61ad468 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java @@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.model.HandleResponse; import org.apache.celeborn.rest.v1.model.UnAvailablePeersResponse; +import org.apache.celeborn.rest.v1.model.WorkerEventRequest; import org.apache.celeborn.rest.v1.model.WorkerExitRequest; import org.apache.celeborn.rest.v1.model.WorkerInfoResponse; @@ -183,6 +184,75 @@ public UnAvailablePeersResponse unavailablePeers(Map additionalH ); } + /** + * + * Send an event to this worker to trigger a state transition. Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @param workerEventRequest (optional) + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse workerEvent(WorkerEventRequest workerEventRequest) throws ApiException { + return this.workerEvent(workerEventRequest, Collections.emptyMap()); + } + + + /** + * + * Send an event to this worker to trigger a state transition. Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @param workerEventRequest (optional) + * @param additionalHeaders additionalHeaders for this call + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse workerEvent(WorkerEventRequest workerEventRequest, Map additionalHeaders) throws ApiException { + Object localVarPostBody = workerEventRequest; + + // create path and map variables + String localVarPath = "/api/v1/workers/events"; + + StringJoiner localVarQueryStringJoiner = new StringJoiner("&"); + String localVarQueryParameterBaseName; + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + Map localVarHeaderParams = new HashMap(); + Map localVarCookieParams = new HashMap(); + Map localVarFormParams = new HashMap(); + + + localVarHeaderParams.putAll(additionalHeaders); + + + + final String[] localVarAccepts = { + "application/json" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + + final String[] localVarContentTypes = { + "application/json" + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[] { "basic" }; + + TypeReference localVarReturnType = new TypeReference() {}; + return apiClient.invokeAPI( + localVarPath, + "POST", + localVarQueryParams, + localVarCollectionQueryParams, + localVarQueryStringJoiner.toString(), + localVarPostBody, + localVarHeaderParams, + localVarCookieParams, + localVarFormParams, + localVarAccept, + localVarContentType, + localVarAuthNames, + localVarReturnType + ); + } + /** * * Trigger this worker to exit. Legal exit types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'. diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml index 1cb04ddcbcd..671bd9af639 100644 --- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml @@ -238,6 +238,27 @@ paths: schema: $ref: '#/components/schemas/HandleResponse' + /api/v1/workers/events: + post: + tags: + - Worker + operationId: workerEvent + description: | + Send an event to this worker to trigger a state transition. + Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerEventRequest' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HandleResponse' + /api/v1/applications: get: tags: @@ -760,6 +781,20 @@ components: - IMMEDIATELY - NONE + WorkerEventRequest: + type: object + properties: + event_type: + type: string + description: | + The type of the worker event. + Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + enum: + - DECOMMISSIONTHENIDLE + - RECOMMISSION + required: + - event_type + LoggerInfo: type: object properties: diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala index a66f53305dd..e1296346a1b 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala @@ -185,6 +185,8 @@ abstract class HttpService extends Service with Logging { def exit(exitType: String): String = throw new UnsupportedOperationException() + def workerEvent(eventType: String): String = throw new UnsupportedOperationException() + def handleWorkerEvent( workerEventType: WorkerEventType, workers: Seq[WorkerInfo]): HandleResponse = diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 91e145ba659..7227e37ecf0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -967,6 +967,22 @@ private[celeborn] class Worker( sb.toString() } + override def workerEvent(eventType: String): String = { + eventType.toUpperCase(Locale.ROOT) match { + case "DECOMMISSIONTHENIDLE" => + workerStatusManager.doTransition(WorkerEventType.DecommissionThenIdle) + case "RECOMMISSION" => + workerStatusManager.doTransition(WorkerEventType.Recommission) + case _ => + return s"Unsupported worker event type: $eventType. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'." + } + val sb = new StringBuilder + sb.append("============================ Worker Event =============================\n") + sb.append(s"Worker event $eventType triggered: \n") + sb.append(workerInfo.toString()).append("\n") + sb.toString() + } + def shutdownGracefully(): Unit = { // During shutdown, to avoid allocate slots in this worker, // add this worker to master's excluded list. When restart, register worker will diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala index 38a745a432b..4f5eff511b4 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala @@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema} import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.tags.Tag -import org.apache.celeborn.rest.v1.model.{HandleResponse, UnAvailablePeersResponse, WorkerExitRequest, WorkerInfoResponse, WorkerTimestampData} +import org.apache.celeborn.rest.v1.model.{HandleResponse, UnAvailablePeersResponse, WorkerEventRequest, WorkerExitRequest, WorkerInfoResponse, WorkerTimestampData} import org.apache.celeborn.server.common.http.api.ApiRequestContext import org.apache.celeborn.server.common.http.api.v1.ApiUtils import org.apache.celeborn.service.deploy.worker.Worker @@ -85,4 +85,23 @@ class WorkerResource extends ApiRequestContext { .success(true) .message(httpService.exit(request.getType.toString)) } + + @Operation(description = + "Send an event to this worker to trigger a state transition. " + + "Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'.") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON, + schema = new Schema(implementation = classOf[HandleResponse])))) + @POST + @Path("/events") + def workerEvent(request: WorkerEventRequest): HandleResponse = { + if (request.getEventType == null) { + return new HandleResponse().success(false).message("eventType is required") + } + new HandleResponse() + .success(true) + .message(httpService.workerEvent(request.getEventType.toString)) + } } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala index 81342ba6818..f19cd88ad5d 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala @@ -18,9 +18,10 @@ package org.apache.celeborn.service.deploy.worker.http.api.v1 import javax.servlet.http.HttpServletResponse +import javax.ws.rs.client.Entity import javax.ws.rs.core.MediaType -import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, ShufflePartitionsResponse, ShufflesResponse, UnAvailablePeersResponse, WorkerInfoResponse} +import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, HandleResponse, ShufflePartitionsResponse, ShufflesResponse, UnAvailablePeersResponse, WorkerEventRequest, WorkerInfoResponse} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite import org.apache.celeborn.service.deploy.MiniClusterFeature @@ -74,4 +75,42 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite with MiniClusterFe assert(HttpServletResponse.SC_OK == response.getStatus) assert(response.readEntity(classOf[UnAvailablePeersResponse]).getPeers.isEmpty) } + + test("worker events resource") { + // null eventType → success:false + var response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity(new WorkerEventRequest(), MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + val nullEventResponse = response.readEntity(classOf[HandleResponse]) + assert(!nullEventResponse.getSuccess) + assert(nullEventResponse.getMessage.contains("eventType is required")) + + // DECOMMISSIONTHENIDLE → success:true, worker enters shutdown/decommission path + response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity( + new WorkerEventRequest().eventType(WorkerEventRequest.EventTypeEnum.DECOMMISSIONTHENIDLE), + MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[HandleResponse]).getSuccess) + + // worker is now in shutdown state (InDecommissionThenIdle or Idle) + response = webTarget.path("workers").request(MediaType.APPLICATION_JSON).get() + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[WorkerInfoResponse]).getIsShutdown) + + // RECOMMISSION → success:true, worker returns to Normal + response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity( + new WorkerEventRequest().eventType(WorkerEventRequest.EventTypeEnum.RECOMMISSION), + MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[HandleResponse]).getSuccess) + + // worker is back to normal: not shutdown, not decommissioning + response = webTarget.path("workers").request(MediaType.APPLICATION_JSON).get() + assert(HttpServletResponse.SC_OK == response.getStatus) + val restoredWorker = response.readEntity(classOf[WorkerInfoResponse]) + assert(!restoredWorker.getIsShutdown) + assert(!restoredWorker.getIsDecommissioning) + } } From 70a7af3993c5b89bd66e58b1b01b3696a9cdbb6a Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Thu, 2 Jul 2026 21:08:26 +0800 Subject: [PATCH 2/2] [CELEBORN-2375] Address review comments: fix null request, required requestBody, rename event_type to eventType MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - WorkerResource.scala: guard against null request body (not just null eventType) to avoid NPE when POST body is absent - worker_rest_v1.yaml: mark requestBody as required:true for /api/v1/workers/events - worker_rest_v1.yaml + WorkerEventRequest.java: rename field event_type → eventType for consistency with other Worker API models (e.g. WorkerExitRequest.type) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../apache/celeborn/rest/v1/model/WorkerEventRequest.java | 2 +- openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml | 5 +++-- .../service/deploy/worker/http/api/v1/WorkerResource.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java index 8512fe8286e..e0cc061cebf 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java @@ -71,7 +71,7 @@ public static EventTypeEnum fromValue(String value) { } } - public static final String JSON_PROPERTY_EVENT_TYPE = "event_type"; + public static final String JSON_PROPERTY_EVENT_TYPE = "eventType"; private EventTypeEnum eventType; public WorkerEventRequest() { diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml index 671bd9af639..6861f7a6142 100644 --- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml @@ -247,6 +247,7 @@ paths: Send an event to this worker to trigger a state transition. Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. requestBody: + required: true content: application/json: schema: @@ -784,7 +785,7 @@ components: WorkerEventRequest: type: object properties: - event_type: + eventType: type: string description: | The type of the worker event. @@ -793,7 +794,7 @@ components: - DECOMMISSIONTHENIDLE - RECOMMISSION required: - - event_type + - eventType LoggerInfo: type: object diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala index 4f5eff511b4..720f2a53644 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala @@ -97,7 +97,7 @@ class WorkerResource extends ApiRequestContext { @POST @Path("/events") def workerEvent(request: WorkerEventRequest): HandleResponse = { - if (request.getEventType == null) { + if (request == null || request.getEventType == null) { return new HandleResponse().success(false).message("eventType is required") } new HandleResponse()