diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java new file mode 100644 index 00000000000..e0cc061cebf --- /dev/null +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerEventRequest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.celeborn.rest.v1.model; + +import java.util.Objects; +import java.util.Arrays; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * WorkerEventRequest + */ +@JsonPropertyOrder({ + WorkerEventRequest.JSON_PROPERTY_EVENT_TYPE +}) +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") +public class WorkerEventRequest { + /** + * The type of the worker event. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + */ + public enum EventTypeEnum { + DECOMMISSIONTHENIDLE("DECOMMISSIONTHENIDLE"), + + RECOMMISSION("RECOMMISSION"); + + private String value; + + EventTypeEnum(String value) { + this.value = value; + } + + @JsonValue + public String getValue() { + return value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + @JsonCreator + public static EventTypeEnum fromValue(String value) { + for (EventTypeEnum b : EventTypeEnum.values()) { + if (b.value.equalsIgnoreCase(value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + } + + public static final String JSON_PROPERTY_EVENT_TYPE = "eventType"; + private EventTypeEnum eventType; + + public WorkerEventRequest() { + } + + public WorkerEventRequest eventType(EventTypeEnum eventType) { + + this.eventType = eventType; + return this; + } + + /** + * The type of the worker event. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @return eventType + */ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_EVENT_TYPE) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + + public EventTypeEnum getEventType() { + return eventType; + } + + + @JsonProperty(JSON_PROPERTY_EVENT_TYPE) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setEventType(EventTypeEnum eventType) { + this.eventType = eventType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerEventRequest workerEventRequest = (WorkerEventRequest) o; + return Objects.equals(this.eventType, workerEventRequest.eventType); + } + + @Override + public int hashCode() { + return Objects.hash(eventType); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class WorkerEventRequest {\n"); + sb.append(" eventType: ").append(toIndentedString(eventType)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java index 0cce44ea03e..48fc61ad468 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/WorkerApi.java @@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.model.HandleResponse; import org.apache.celeborn.rest.v1.model.UnAvailablePeersResponse; +import org.apache.celeborn.rest.v1.model.WorkerEventRequest; import org.apache.celeborn.rest.v1.model.WorkerExitRequest; import org.apache.celeborn.rest.v1.model.WorkerInfoResponse; @@ -183,6 +184,75 @@ public UnAvailablePeersResponse unavailablePeers(Map additionalH ); } + /** + * + * Send an event to this worker to trigger a state transition. Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @param workerEventRequest (optional) + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse workerEvent(WorkerEventRequest workerEventRequest) throws ApiException { + return this.workerEvent(workerEventRequest, Collections.emptyMap()); + } + + + /** + * + * Send an event to this worker to trigger a state transition. Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + * @param workerEventRequest (optional) + * @param additionalHeaders additionalHeaders for this call + * @return HandleResponse + * @throws ApiException if fails to make API call + */ + public HandleResponse workerEvent(WorkerEventRequest workerEventRequest, Map additionalHeaders) throws ApiException { + Object localVarPostBody = workerEventRequest; + + // create path and map variables + String localVarPath = "/api/v1/workers/events"; + + StringJoiner localVarQueryStringJoiner = new StringJoiner("&"); + String localVarQueryParameterBaseName; + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + Map localVarHeaderParams = new HashMap(); + Map localVarCookieParams = new HashMap(); + Map localVarFormParams = new HashMap(); + + + localVarHeaderParams.putAll(additionalHeaders); + + + + final String[] localVarAccepts = { + "application/json" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + + final String[] localVarContentTypes = { + "application/json" + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[] { "basic" }; + + TypeReference localVarReturnType = new TypeReference() {}; + return apiClient.invokeAPI( + localVarPath, + "POST", + localVarQueryParams, + localVarCollectionQueryParams, + localVarQueryStringJoiner.toString(), + localVarPostBody, + localVarHeaderParams, + localVarCookieParams, + localVarFormParams, + localVarAccept, + localVarContentType, + localVarAuthNames, + localVarReturnType + ); + } + /** * * Trigger this worker to exit. Legal exit types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'. diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml index 1cb04ddcbcd..6861f7a6142 100644 --- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml @@ -238,6 +238,28 @@ paths: schema: $ref: '#/components/schemas/HandleResponse' + /api/v1/workers/events: + post: + tags: + - Worker + operationId: workerEvent + description: | + Send an event to this worker to trigger a state transition. + Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerEventRequest' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HandleResponse' + /api/v1/applications: get: tags: @@ -760,6 +782,20 @@ components: - IMMEDIATELY - NONE + WorkerEventRequest: + type: object + properties: + eventType: + type: string + description: | + The type of the worker event. + Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'. + enum: + - DECOMMISSIONTHENIDLE + - RECOMMISSION + required: + - eventType + LoggerInfo: type: object properties: diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala index a66f53305dd..e1296346a1b 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala @@ -185,6 +185,8 @@ abstract class HttpService extends Service with Logging { def exit(exitType: String): String = throw new UnsupportedOperationException() + def workerEvent(eventType: String): String = throw new UnsupportedOperationException() + def handleWorkerEvent( workerEventType: WorkerEventType, workers: Seq[WorkerInfo]): HandleResponse = diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 91e145ba659..7227e37ecf0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -967,6 +967,22 @@ private[celeborn] class Worker( sb.toString() } + override def workerEvent(eventType: String): String = { + eventType.toUpperCase(Locale.ROOT) match { + case "DECOMMISSIONTHENIDLE" => + workerStatusManager.doTransition(WorkerEventType.DecommissionThenIdle) + case "RECOMMISSION" => + workerStatusManager.doTransition(WorkerEventType.Recommission) + case _ => + return s"Unsupported worker event type: $eventType. Legal types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'." + } + val sb = new StringBuilder + sb.append("============================ Worker Event =============================\n") + sb.append(s"Worker event $eventType triggered: \n") + sb.append(workerInfo.toString()).append("\n") + sb.toString() + } + def shutdownGracefully(): Unit = { // During shutdown, to avoid allocate slots in this worker, // add this worker to master's excluded list. When restart, register worker will diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala index 38a745a432b..720f2a53644 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala @@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema} import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.tags.Tag -import org.apache.celeborn.rest.v1.model.{HandleResponse, UnAvailablePeersResponse, WorkerExitRequest, WorkerInfoResponse, WorkerTimestampData} +import org.apache.celeborn.rest.v1.model.{HandleResponse, UnAvailablePeersResponse, WorkerEventRequest, WorkerExitRequest, WorkerInfoResponse, WorkerTimestampData} import org.apache.celeborn.server.common.http.api.ApiRequestContext import org.apache.celeborn.server.common.http.api.v1.ApiUtils import org.apache.celeborn.service.deploy.worker.Worker @@ -85,4 +85,23 @@ class WorkerResource extends ApiRequestContext { .success(true) .message(httpService.exit(request.getType.toString)) } + + @Operation(description = + "Send an event to this worker to trigger a state transition. " + + "Legal event types are 'DECOMMISSIONTHENIDLE' and 'RECOMMISSION'.") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON, + schema = new Schema(implementation = classOf[HandleResponse])))) + @POST + @Path("/events") + def workerEvent(request: WorkerEventRequest): HandleResponse = { + if (request == null || request.getEventType == null) { + return new HandleResponse().success(false).message("eventType is required") + } + new HandleResponse() + .success(true) + .message(httpService.workerEvent(request.getEventType.toString)) + } } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala index 81342ba6818..f19cd88ad5d 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala @@ -18,9 +18,10 @@ package org.apache.celeborn.service.deploy.worker.http.api.v1 import javax.servlet.http.HttpServletResponse +import javax.ws.rs.client.Entity import javax.ws.rs.core.MediaType -import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, ShufflePartitionsResponse, ShufflesResponse, UnAvailablePeersResponse, WorkerInfoResponse} +import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, HandleResponse, ShufflePartitionsResponse, ShufflesResponse, UnAvailablePeersResponse, WorkerEventRequest, WorkerInfoResponse} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite import org.apache.celeborn.service.deploy.MiniClusterFeature @@ -74,4 +75,42 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite with MiniClusterFe assert(HttpServletResponse.SC_OK == response.getStatus) assert(response.readEntity(classOf[UnAvailablePeersResponse]).getPeers.isEmpty) } + + test("worker events resource") { + // null eventType → success:false + var response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity(new WorkerEventRequest(), MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + val nullEventResponse = response.readEntity(classOf[HandleResponse]) + assert(!nullEventResponse.getSuccess) + assert(nullEventResponse.getMessage.contains("eventType is required")) + + // DECOMMISSIONTHENIDLE → success:true, worker enters shutdown/decommission path + response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity( + new WorkerEventRequest().eventType(WorkerEventRequest.EventTypeEnum.DECOMMISSIONTHENIDLE), + MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[HandleResponse]).getSuccess) + + // worker is now in shutdown state (InDecommissionThenIdle or Idle) + response = webTarget.path("workers").request(MediaType.APPLICATION_JSON).get() + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[WorkerInfoResponse]).getIsShutdown) + + // RECOMMISSION → success:true, worker returns to Normal + response = webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).post( + Entity.entity( + new WorkerEventRequest().eventType(WorkerEventRequest.EventTypeEnum.RECOMMISSION), + MediaType.APPLICATION_JSON)) + assert(HttpServletResponse.SC_OK == response.getStatus) + assert(response.readEntity(classOf[HandleResponse]).getSuccess) + + // worker is back to normal: not shutdown, not decommissioning + response = webTarget.path("workers").request(MediaType.APPLICATION_JSON).get() + assert(HttpServletResponse.SC_OK == response.getStatus) + val restoredWorker = response.readEntity(classOf[WorkerInfoResponse]) + assert(!restoredWorker.getIsShutdown) + assert(!restoredWorker.getIsDecommissioning) + } }