From 1492af7725c34db6dca53a9c5c6c5c100263b141 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jun 2026 22:53:14 +0200 Subject: [PATCH 1/6] feature: migrate the control rpc to connect --- plugin.go | 10 +- rpc.go | 636 ++++++++-------------------------------------------- rpc_test.go | 4 +- 3 files changed, 107 insertions(+), 543 deletions(-) diff --git a/plugin.go b/plugin.go index cc06fcf..39dce1b 100644 --- a/plugin.go +++ b/plugin.go @@ -4,9 +4,11 @@ import ( "context" stderr "errors" "log/slog" + "net/http" "sync" "time" + "github.com/roadrunner-server/api-go/v6/centrifugo/api/v1/apiV1connect" centrifugov1 "github.com/roadrunner-server/api-go/v6/centrifugo/proxy/v1" "github.com/roadrunner-server/errors" "github.com/roadrunner-server/pool/v2/payload" @@ -222,11 +224,13 @@ func (p *Plugin) Name() string { return name } -func (p *Plugin) RPC() any { - return &rpc{ +// RPC returns the CentrifugoApi connect handler mounted on the rpc plugin's +// server; every call is proxied to the connected Centrifugo server. +func (p *Plugin) RPC() (string, http.Handler) { + return apiV1connect.NewCentrifugoApiHandler(&rpc{ client: p.client, log: p.log, - } + }) } // internal diff --git a/rpc.go b/rpc.go index e201bb5..7c47d92 100644 --- a/rpc.go +++ b/rpc.go @@ -2,626 +2,184 @@ package centrifuge import ( "context" + stderr "errors" "log/slog" + "connectrpc.com/connect" v1Client "github.com/roadrunner-server/api-go/v6/centrifugo/api/v1" - "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/api-go/v6/centrifugo/api/v1/apiV1connect" + "google.golang.org/grpc" + "google.golang.org/grpc/status" ) +// Compile-time check that rpc implements the generated handler interface. +var _ apiV1connect.CentrifugoApiHandler = (*rpc)(nil) + +// rpc exposes the Centrifugo server API (centrifugal.centrifugo.api.CentrifugoApi) +// on the RoadRunner Connect-RPC plane; every call is proxied 1:1 to the +// connected Centrifugo server. type rpc struct { client *client log *slog.Logger } -/* -service CentrifugoApi { - rpc Batch(BatchRequest) returns (BatchResponse) {} - rpc Publish(PublishRequest) returns (PublishResponse) {} - rpc Broadcast(BroadcastRequest) returns (BroadcastResponse) {} - rpc Subscribe(SubscribeRequest) returns (SubscribeResponse) {} - rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse) {} - rpc Disconnect(DisconnectRequest) returns (DisconnectResponse) {} - rpc Presence(PresenceRequest) returns (PresenceResponse) {} - rpc PresenceStats(PresenceStatsRequest) returns (PresenceStatsResponse) {} - rpc History(HistoryRequest) returns (HistoryResponse) {} - rpc HistoryRemove(HistoryRemoveRequest) returns (HistoryRemoveResponse) {} - rpc Info(InfoRequest) returns (InfoResponse) {} - rpc RPC(RPCRequest) returns (RPCResponse) {} - rpc Refresh(RefreshRequest) returns (RefreshResponse) {} - rpc Channels(ChannelsRequest) returns (ChannelsResponse) {} - rpc Connections(ConnectionsRequest) returns (ConnectionsResponse) {} - rpc UpdateUserStatus(UpdateUserStatusRequest) returns (UpdateUserStatusResponse) {} - rpc GetUserStatus(GetUserStatusRequest) returns (GetUserStatusResponse) {} - rpc DeleteUserStatus(DeleteUserStatusRequest) returns (DeleteUserStatusResponse) {} - rpc BlockUser(BlockUserRequest) returns (BlockUserResponse) {} - rpc UnblockUser(UnblockUserRequest) returns (UnblockUserResponse) {} - rpc RevokeToken(RevokeTokenRequest) returns (RevokeTokenResponse) {} - rpc InvalidateUserTokens(InvalidateUserTokensRequest) returns (InvalidateUserTokensResponse) {} - rpc DeviceRegister(DeviceRegisterRequest) returns (DeviceRegisterResponse) {} - rpc DeviceUpdate(DeviceUpdateRequest) returns (DeviceUpdateResponse) {} - rpc DeviceRemove(DeviceRemoveRequest) returns (DeviceRemoveResponse) {} - rpc DeviceList(DeviceListRequest) returns (DeviceListResponse) {} - rpc DeviceTopicList(DeviceTopicListRequest) returns (DeviceTopicListResponse) {} - rpc DeviceTopicUpdate(DeviceTopicUpdateRequest) returns (DeviceTopicUpdateResponse) {} - rpc UserTopicList(UserTopicListRequest) returns (UserTopicListResponse) {} - rpc UserTopicUpdate(UserTopicUpdateRequest) returns (UserTopicUpdateResponse) {} - rpc SendPushNotification(SendPushNotificationRequest) returns (SendPushNotificationResponse) {} - rpc UpdatePushStatus(UpdatePushStatusRequest) returns (UpdatePushStatusResponse) {} - rpc CancelPush(CancelPushRequest) returns (CancelPushResponse) {} - -} -*/ - -func (r *rpc) Batch(in *v1Client.BatchRequest, out *v1Client.BatchResponse) error { - r.log.Debug("got batch request") +// forward proxies a single CentrifugoApi call to the connected Centrifugo +// client, translating gRPC status errors into connect errors (the two code +// registries are aligned). The client is re-fetched on every call because +// connect() replaces it after reconnects. +func forward[Req, Resp any]( + ctx context.Context, + r *rpc, + name string, + req *connect.Request[Req], + call func(v1Client.CentrifugoApiClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error), +) (*connect.Response[Resp], error) { + r.log.Debug("got " + name + " request") client := r.client.client() if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") + return nil, connect.NewError(connect.CodeUnavailable, stderr.New("RoadRunner is not ready yet, try in a few seconds")) } - resp, err := client.Batch(context.Background(), in) + resp, err := call(client, ctx, req.Msg) if err != nil { - return err + if st, ok := status.FromError(err); ok { + return nil, connect.NewError(connect.Code(st.Code()), stderr.New(st.Message())) + } + return nil, err } - out.Replies = resp.GetReplies() - - return nil + return connect.NewResponse(resp), nil } -func (r *rpc) Publish(in *v1Client.PublishRequest, out *v1Client.PublishResponse) error { - r.log.Debug("got publish request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - - resp, err := client.Publish(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Batch(ctx context.Context, req *connect.Request[v1Client.BatchRequest]) (*connect.Response[v1Client.BatchResponse], error) { + return forward(ctx, r, "batch", req, v1Client.CentrifugoApiClient.Batch) } -func (r *rpc) Broadcast(in *v1Client.BroadcastRequest, out *v1Client.BroadcastResponse) error { - r.log.Debug("got broadcast request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Broadcast(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Publish(ctx context.Context, req *connect.Request[v1Client.PublishRequest]) (*connect.Response[v1Client.PublishResponse], error) { + return forward(ctx, r, "publish", req, v1Client.CentrifugoApiClient.Publish) } -func (r *rpc) Subscribe(in *v1Client.SubscribeRequest, out *v1Client.SubscribeResponse) error { - r.log.Debug("got subscribe request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Subscribe(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Broadcast(ctx context.Context, req *connect.Request[v1Client.BroadcastRequest]) (*connect.Response[v1Client.BroadcastResponse], error) { + return forward(ctx, r, "broadcast", req, v1Client.CentrifugoApiClient.Broadcast) } -func (r *rpc) Unsubscribe(in *v1Client.UnsubscribeRequest, out *v1Client.UnsubscribeResponse) error { - r.log.Debug("got unsubscribe request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Unsubscribe(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Subscribe(ctx context.Context, req *connect.Request[v1Client.SubscribeRequest]) (*connect.Response[v1Client.SubscribeResponse], error) { + return forward(ctx, r, "subscribe", req, v1Client.CentrifugoApiClient.Subscribe) } -func (r *rpc) Disconnect(in *v1Client.DisconnectRequest, out *v1Client.DisconnectResponse) error { - r.log.Debug("got disconnect request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Disconnect(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Unsubscribe(ctx context.Context, req *connect.Request[v1Client.UnsubscribeRequest]) (*connect.Response[v1Client.UnsubscribeResponse], error) { + return forward(ctx, r, "unsubscribe", req, v1Client.CentrifugoApiClient.Unsubscribe) } -func (r *rpc) Presence(in *v1Client.PresenceRequest, out *v1Client.PresenceResponse) error { - r.log.Debug("got presence request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Presence(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - return nil +func (r *rpc) Disconnect(ctx context.Context, req *connect.Request[v1Client.DisconnectRequest]) (*connect.Response[v1Client.DisconnectResponse], error) { + return forward(ctx, r, "disconnect", req, v1Client.CentrifugoApiClient.Disconnect) } -func (r *rpc) PresenceStats(in *v1Client.PresenceStatsRequest, out *v1Client.PresenceStatsResponse) error { - r.log.Debug("got presence_stats request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.PresenceStats(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Presence(ctx context.Context, req *connect.Request[v1Client.PresenceRequest]) (*connect.Response[v1Client.PresenceResponse], error) { + return forward(ctx, r, "presence", req, v1Client.CentrifugoApiClient.Presence) } -func (r *rpc) History(in *v1Client.HistoryRequest, out *v1Client.HistoryResponse) error { - r.log.Debug("got history request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.History(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) PresenceStats(ctx context.Context, req *connect.Request[v1Client.PresenceStatsRequest]) (*connect.Response[v1Client.PresenceStatsResponse], error) { + return forward(ctx, r, "presence_stats", req, v1Client.CentrifugoApiClient.PresenceStats) } -func (r *rpc) HistoryRemove(in *v1Client.HistoryRemoveRequest, out *v1Client.HistoryRemoveResponse) error { - r.log.Debug("got history_remove request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.HistoryRemove(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) History(ctx context.Context, req *connect.Request[v1Client.HistoryRequest]) (*connect.Response[v1Client.HistoryResponse], error) { + return forward(ctx, r, "history", req, v1Client.CentrifugoApiClient.History) } -func (r *rpc) Info(in *v1Client.InfoRequest, out *v1Client.InfoResponse) error { - r.log.Debug("got info request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Info(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) HistoryRemove(ctx context.Context, req *connect.Request[v1Client.HistoryRemoveRequest]) (*connect.Response[v1Client.HistoryRemoveResponse], error) { + return forward(ctx, r, "history_remove", req, v1Client.CentrifugoApiClient.HistoryRemove) } -func (r *rpc) RPC(in *v1Client.RPCRequest, out *v1Client.RPCResponse) error { - r.log.Debug("got rpc request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.RPC(context.Background(), in) - if err != nil { - return err - } - - out.Result = resp.GetResult() - out.Error = resp.GetError() - - return nil +func (r *rpc) Info(ctx context.Context, req *connect.Request[v1Client.InfoRequest]) (*connect.Response[v1Client.InfoResponse], error) { + return forward(ctx, r, "info", req, v1Client.CentrifugoApiClient.Info) } -func (r *rpc) Refresh(in *v1Client.RefreshRequest, out *v1Client.RefreshResponse) error { - r.log.Debug("got refresh request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Refresh(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) RPC(ctx context.Context, req *connect.Request[v1Client.RPCRequest]) (*connect.Response[v1Client.RPCResponse], error) { + return forward(ctx, r, "rpc", req, v1Client.CentrifugoApiClient.RPC) } -func (r *rpc) Channels(in *v1Client.ChannelsRequest, out *v1Client.ChannelsResponse) error { - r.log.Debug("got channels request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Channels(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Refresh(ctx context.Context, req *connect.Request[v1Client.RefreshRequest]) (*connect.Response[v1Client.RefreshResponse], error) { + return forward(ctx, r, "refresh", req, v1Client.CentrifugoApiClient.Refresh) } -func (r *rpc) Connections(in *v1Client.ConnectionsRequest, out *v1Client.ConnectionsResponse) error { - r.log.Debug("got connections request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.Connections(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Channels(ctx context.Context, req *connect.Request[v1Client.ChannelsRequest]) (*connect.Response[v1Client.ChannelsResponse], error) { + return forward(ctx, r, "channels", req, v1Client.CentrifugoApiClient.Channels) } -func (r *rpc) UpdateUserStatus(in *v1Client.UpdateUserStatusRequest, out *v1Client.UpdateUserStatusResponse) error { - r.log.Debug("got update_user_status request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.UpdateUserStatus(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) Connections(ctx context.Context, req *connect.Request[v1Client.ConnectionsRequest]) (*connect.Response[v1Client.ConnectionsResponse], error) { + return forward(ctx, r, "connections", req, v1Client.CentrifugoApiClient.Connections) } -func (r *rpc) GetUserStatus(in *v1Client.GetUserStatusRequest, out *v1Client.GetUserStatusResponse) error { - r.log.Debug("got get_user_status request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.GetUserStatus(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) UpdateUserStatus(ctx context.Context, req *connect.Request[v1Client.UpdateUserStatusRequest]) (*connect.Response[v1Client.UpdateUserStatusResponse], error) { + return forward(ctx, r, "update_user_status", req, v1Client.CentrifugoApiClient.UpdateUserStatus) } -func (r *rpc) DeleteUserStatus(in *v1Client.DeleteUserStatusRequest, out *v1Client.DeleteUserStatusResponse) error { - r.log.Debug("got delete_user_status request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeleteUserStatus(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) GetUserStatus(ctx context.Context, req *connect.Request[v1Client.GetUserStatusRequest]) (*connect.Response[v1Client.GetUserStatusResponse], error) { + return forward(ctx, r, "get_user_status", req, v1Client.CentrifugoApiClient.GetUserStatus) } -func (r *rpc) BlockUser(in *v1Client.BlockUserRequest, out *v1Client.BlockUserResponse) error { - r.log.Debug("got block_user request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.BlockUser(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeleteUserStatus(ctx context.Context, req *connect.Request[v1Client.DeleteUserStatusRequest]) (*connect.Response[v1Client.DeleteUserStatusResponse], error) { + return forward(ctx, r, "delete_user_status", req, v1Client.CentrifugoApiClient.DeleteUserStatus) } -func (r *rpc) UnblockUser(in *v1Client.UnblockUserRequest, out *v1Client.UnblockUserResponse) error { - r.log.Debug("got unblock_user request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.UnblockUser(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) BlockUser(ctx context.Context, req *connect.Request[v1Client.BlockUserRequest]) (*connect.Response[v1Client.BlockUserResponse], error) { + return forward(ctx, r, "block_user", req, v1Client.CentrifugoApiClient.BlockUser) } -func (r *rpc) RevokeToken(in *v1Client.RevokeTokenRequest, out *v1Client.RevokeTokenResponse) error { - r.log.Debug("got revoke_token request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.RevokeToken(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) UnblockUser(ctx context.Context, req *connect.Request[v1Client.UnblockUserRequest]) (*connect.Response[v1Client.UnblockUserResponse], error) { + return forward(ctx, r, "unblock_user", req, v1Client.CentrifugoApiClient.UnblockUser) } -func (r *rpc) InvalidateUserTokens(in *v1Client.InvalidateUserTokensRequest, out *v1Client.InvalidateUserTokensResponse) error { - r.log.Debug("got invalidate_user_tokens request") - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.InvalidateUserTokens(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) RevokeToken(ctx context.Context, req *connect.Request[v1Client.RevokeTokenRequest]) (*connect.Response[v1Client.RevokeTokenResponse], error) { + return forward(ctx, r, "revoke_token", req, v1Client.CentrifugoApiClient.RevokeToken) } -func (r *rpc) DeviceRegister(in *v1Client.DeviceRegisterRequest, out *v1Client.DeviceRegisterResponse) error { - r.log.Debug("got device register request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceRegister(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) InvalidateUserTokens(ctx context.Context, req *connect.Request[v1Client.InvalidateUserTokensRequest]) (*connect.Response[v1Client.InvalidateUserTokensResponse], error) { + return forward(ctx, r, "invalidate_user_tokens", req, v1Client.CentrifugoApiClient.InvalidateUserTokens) } -func (r *rpc) DeviceUpdate(in *v1Client.DeviceUpdateRequest, out *v1Client.DeviceUpdateResponse) error { - r.log.Debug("got device update request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceUpdate(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceRegister(ctx context.Context, req *connect.Request[v1Client.DeviceRegisterRequest]) (*connect.Response[v1Client.DeviceRegisterResponse], error) { + return forward(ctx, r, "device_register", req, v1Client.CentrifugoApiClient.DeviceRegister) } -func (r *rpc) DeviceRemove(in *v1Client.DeviceRemoveRequest, out *v1Client.DeviceRemoveResponse) error { - r.log.Debug("got device remove request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceRemove(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceUpdate(ctx context.Context, req *connect.Request[v1Client.DeviceUpdateRequest]) (*connect.Response[v1Client.DeviceUpdateResponse], error) { + return forward(ctx, r, "device_update", req, v1Client.CentrifugoApiClient.DeviceUpdate) } -func (r *rpc) DeviceList(in *v1Client.DeviceListRequest, out *v1Client.DeviceListResponse) error { - r.log.Debug("got device list request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceList(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceRemove(ctx context.Context, req *connect.Request[v1Client.DeviceRemoveRequest]) (*connect.Response[v1Client.DeviceRemoveResponse], error) { + return forward(ctx, r, "device_remove", req, v1Client.CentrifugoApiClient.DeviceRemove) } -func (r *rpc) DeviceTopicList(in *v1Client.DeviceTopicListRequest, out *v1Client.DeviceTopicListResponse) error { - r.log.Debug("got device topic list request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceTopicList(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceList(ctx context.Context, req *connect.Request[v1Client.DeviceListRequest]) (*connect.Response[v1Client.DeviceListResponse], error) { + return forward(ctx, r, "device_list", req, v1Client.CentrifugoApiClient.DeviceList) } -func (r *rpc) DeviceTopicUpdate(in *v1Client.DeviceTopicUpdateRequest, out *v1Client.DeviceTopicUpdateResponse) error { - r.log.Debug("got device topic update request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.DeviceTopicUpdate(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceTopicList(ctx context.Context, req *connect.Request[v1Client.DeviceTopicListRequest]) (*connect.Response[v1Client.DeviceTopicListResponse], error) { + return forward(ctx, r, "device_topic_list", req, v1Client.CentrifugoApiClient.DeviceTopicList) } -func (r *rpc) UserTopicList(in *v1Client.UserTopicListRequest, out *v1Client.UserTopicListResponse) error { - r.log.Debug("got user topic list request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.UserTopicList(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) DeviceTopicUpdate(ctx context.Context, req *connect.Request[v1Client.DeviceTopicUpdateRequest]) (*connect.Response[v1Client.DeviceTopicUpdateResponse], error) { + return forward(ctx, r, "device_topic_update", req, v1Client.CentrifugoApiClient.DeviceTopicUpdate) } -func (r *rpc) UserTopicUpdate(in *v1Client.UserTopicUpdateRequest, out *v1Client.UserTopicUpdateResponse) error { - r.log.Debug("got user topic update request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.UserTopicUpdate(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) UserTopicList(ctx context.Context, req *connect.Request[v1Client.UserTopicListRequest]) (*connect.Response[v1Client.UserTopicListResponse], error) { + return forward(ctx, r, "user_topic_list", req, v1Client.CentrifugoApiClient.UserTopicList) } -func (r *rpc) SendPushNotification(in *v1Client.SendPushNotificationRequest, out *v1Client.SendPushNotificationResponse) error { - r.log.Debug("got send push notification request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.SendPushNotification(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) UserTopicUpdate(ctx context.Context, req *connect.Request[v1Client.UserTopicUpdateRequest]) (*connect.Response[v1Client.UserTopicUpdateResponse], error) { + return forward(ctx, r, "user_topic_update", req, v1Client.CentrifugoApiClient.UserTopicUpdate) } -func (r *rpc) UpdatePushStatus(in *v1Client.UpdatePushStatusRequest, out *v1Client.UpdatePushStatusResponse) error { - r.log.Debug("got update push status request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.UpdatePushStatus(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() - - return nil +func (r *rpc) SendPushNotification(ctx context.Context, req *connect.Request[v1Client.SendPushNotificationRequest]) (*connect.Response[v1Client.SendPushNotificationResponse], error) { + return forward(ctx, r, "send_push_notification", req, v1Client.CentrifugoApiClient.SendPushNotification) } -func (r *rpc) CancelPush(in *v1Client.CancelPushRequest, out *v1Client.CancelPushResponse) error { - r.log.Debug("got cancel push request") - - client := r.client.client() - if client == nil { - return errors.Str("RoadRunner is not ready yet, try in a few seconds") - } - resp, err := client.CancelPush(context.Background(), in) - if err != nil { - return err - } - - out.Error = resp.GetError() - out.Result = resp.GetResult() +func (r *rpc) UpdatePushStatus(ctx context.Context, req *connect.Request[v1Client.UpdatePushStatusRequest]) (*connect.Response[v1Client.UpdatePushStatusResponse], error) { + return forward(ctx, r, "update_push_status", req, v1Client.CentrifugoApiClient.UpdatePushStatus) +} - return nil +func (r *rpc) CancelPush(ctx context.Context, req *connect.Request[v1Client.CancelPushRequest]) (*connect.Response[v1Client.CancelPushResponse], error) { + return forward(ctx, r, "cancel_push", req, v1Client.CentrifugoApiClient.CancelPush) } diff --git a/rpc_test.go b/rpc_test.go index cba4430..170f1b5 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -3,6 +3,7 @@ package centrifuge import ( "testing" + "connectrpc.com/connect" v1Client "github.com/roadrunner-server/api-go/v6/centrifugo/api/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,7 +13,8 @@ func TestRPCBatchNotReady(t *testing.T) { // client() returns nil until connect() succeeds, so Batch returns early. r := &rpc{client: &client{}, log: testLogger()} - err := r.Batch(&v1Client.BatchRequest{}, &v1Client.BatchResponse{}) + _, err := r.Batch(t.Context(), connect.NewRequest(&v1Client.BatchRequest{})) require.Error(t, err) assert.Contains(t, err.Error(), "RoadRunner is not ready yet") + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) } From 68598c7acc11db760f521559c2dcc49f1319b2bc Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jun 2026 22:53:19 +0200 Subject: [PATCH 2/6] chore: update dependencies --- go.mod | 3 ++- go.sum | 6 ++++-- go.work.sum | 29 ++++++++++++++++++++++++++++- tests/go.mod | 17 ++++++++--------- tests/go.sum | 32 ++++++++++++++++---------------- 5 files changed, 58 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 2a00b48..8f7bac4 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,10 @@ go 1.26 toolchain go1.26.3 require ( + connectrpc.com/connect v1.20.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/prometheus/client_golang v1.23.2 - github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 + github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 github.com/roadrunner-server/errors v1.5.0 github.com/roadrunner-server/goridge/v4 v4.0.0-beta.2 diff --git a/go.sum b/go.sum index b675d18..6e7e1b0 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +connectrpc.com/connect v1.20.0 h1:6TNDAB+WeNd2uolWNlYczB5E0KNNaVMNUEx8JEUsPmQ= +connectrpc.com/connect v1.20.0/go.mod h1:A2ygJrukXwWy32vkCAAHNVguZrqZ+jeZ9rGRnGR4dN4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -35,8 +37,8 @@ github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTU github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 h1:FcRcCvW9OfQvH45SFsI21VoHpOOov56OvOSnO4UKvXs= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc h1:tJhOZ31bN0HVO4JTgkcjHK1MLu8Waee+rFnbx5AjXyQ= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 h1:GqsZzWQ5jMXRF1O/b8IqFz9PLpS7Ui0K4OyACLql2MI= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2/go.mod h1:2v4yUK5Kvbvq8C3IkDoBkuamq9h+7i/JLjyf7k1j5JM= github.com/roadrunner-server/errors v1.5.0 h1:unG7LKIZrSzkCCF3YLRLA5VyqE0KKomofXVJUXJe00g= diff --git a/go.work.sum b/go.work.sum index b5ad6c5..e34c18a 100644 --- a/go.work.sum +++ b/go.work.sum @@ -958,6 +958,7 @@ cloud.google.com/go/workflows v1.11.1 h1:2akeQ/PgtRhrNuD/n1WvJd5zb7YyuDZrlOanBj2 cloud.google.com/go/workflows v1.11.1/go.mod h1:Z+t10G1wF7h8LgdY/EmRcQY8ptBD/nvofaL6FqlET6g= cloud.google.com/go/workflows v1.12.0 h1:cSUlx4PVV9O0vYCl+pHAUmu0996A7eN602d4wjjVHRs= cloud.google.com/go/workflows v1.12.0/go.mod h1:PYhSk2b6DhZ508tj8HXKaBh+OFe+xdl0dHF/tJdzPQM= +connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9 h1:VpgP7xuJadIUuKccphEpTJnWhS2jkQyMt6Y7pJCD7fY= gioui.org v0.0.0-20210308172011-57750fc8a0a6 h1:K72hopUosKG3ntOPNG4OzzbuhxGuVf06fa2la1/H/Ho= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= @@ -1123,6 +1124,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= @@ -1356,9 +1358,13 @@ github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Q github.com/pkg/sftp v1.13.7/go.mod h1:KMKI0t3T6hfA+lTR/ssZdunHo+uwq7ghoN09/FSu3DY= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -1369,7 +1375,6 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 h1:K1Xf3bKttbF+koVGaX5xngRIZ5bVjbmPnaxE/dR08uY= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= @@ -1409,6 +1414,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= @@ -1479,9 +1485,11 @@ go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis go.temporal.io/api v1.43.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/api v1.57.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -1506,6 +1514,8 @@ golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ss golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= +golang.org/x/crypto v0.53.0/go.mod h1:DNLU434OwVakk9PzuwV8w62mAJpRJL3vsgcfp4Qnsio= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1555,6 +1565,7 @@ golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= +golang.org/x/mod v0.36.0/go.mod h1:moc6ELqsWcOw5Ef3xVprK5ul/MvtVvkIXLziUOICjUQ= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= @@ -1591,7 +1602,9 @@ golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= @@ -1625,7 +1638,9 @@ golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1635,6 +1650,7 @@ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210304124612-50617c2ba197/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1685,10 +1701,14 @@ golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457 h1:zf5N6UOrA487eEFacMePxjXAJctxKmyjKUsjA11Uzuk= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2/go.mod h1:b7fPSJ0pKZ3ccUh8gnTONJxhn3c/PS6tyzQvyqw4iA8= +golang.org/x/telemetry v0.0.0-20260508192327-42602be52be6/go.mod h1:Eqhaxk/wZsWEH8CRxLwj6xzEJbz7k1EFGqx7nyCoabE= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -1715,6 +1735,8 @@ golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s= golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= +golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk= +golang.org/x/term v0.44.0/go.mod h1:7ze4MdzUzLXpSAoFP1H0bOI9aXDqveSvatT5vKcFh2Y= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= @@ -1728,6 +1750,7 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1773,6 +1796,8 @@ golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= +golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI= +golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -2016,6 +2041,7 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= @@ -2059,6 +2085,7 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= diff --git a/tests/go.mod b/tests/go.mod index 14350d3..1dc335c 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -36,7 +36,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/mailru/easyjson v0.9.2 // indirect - github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-colorable v0.1.15 // indirect github.com/mattn/go-isatty v0.0.22 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.3.1 // indirect @@ -44,9 +44,9 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/common v0.68.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect - github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 // indirect + github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc // indirect github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 // indirect github.com/roadrunner-server/errors v1.5.0 // indirect github.com/roadrunner-server/events v1.0.1 // indirect @@ -69,13 +69,12 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.28.0 // indirect - go.yaml.in/yaml/v2 v2.4.4 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/net v0.55.0 // indirect - golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.45.0 // indirect - golang.org/x/text v0.37.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect + golang.org/x/net v0.56.0 // indirect + golang.org/x/sync v0.21.0 // indirect + golang.org/x/sys v0.46.0 // indirect + golang.org/x/text v0.38.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c // indirect google.golang.org/grpc v1.81.1 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/tests/go.sum b/tests/go.sum index f9160ba..9ec49de 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -49,8 +49,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.9.2 h1:dX8U45hQsZpxd80nLvDGihsQ/OxlvTkVUXH2r/8cb2M= github.com/mailru/easyjson v0.9.2/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= -github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= -github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-colorable v0.1.15 h1:+u9SLTRGnXv73cEsnsmoZBom+dMU88B2M0aDcWy0/jY= +github.com/mattn/go-colorable v0.1.15/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4= github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -65,12 +65,12 @@ github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= -github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= +github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY= +github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12 h1:FcRcCvW9OfQvH45SFsI21VoHpOOov56OvOSnO4UKvXs= -github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc h1:tJhOZ31bN0HVO4JTgkcjHK1MLu8Waee+rFnbx5AjXyQ= +github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc/go.mod h1:prGWJ2GoF5YD5PIG7Tb6VKulU3bWoFwr9DCwgxheb80= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2 h1:GqsZzWQ5jMXRF1O/b8IqFz9PLpS7Ui0K4OyACLql2MI= github.com/roadrunner-server/api-plugins/v6 v6.0.0-beta.2/go.mod h1:2v4yUK5Kvbvq8C3IkDoBkuamq9h+7i/JLjyf7k1j5JM= github.com/roadrunner-server/config/v6 v6.0.0-beta.3 h1:G0EUzJ6Yw4UnleM6BhnOBbYPXKDHRmCJiGhC3nXDBwI= @@ -149,20 +149,20 @@ go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= -golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= -golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= -golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= +golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= +golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= -golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= -golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.38.0 h1:sXmwo9DwP3OK9EZ7PqAdaooSGozfl/3a6/xJcbzPRhE= +golang.org/x/text v0.38.0/go.mod h1:YXZt3QhHUKYT53r2lLKFIVi6Ao1jdzrTR/KQ09qyxF4= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa h1:mZHHdPZl0dbGHCflZgAq/Q468DWVFcU2whhB2KAo8fk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c h1:oGW7p9KPQPfqJ90YnVa5YfemKjLxc+704Am9EgLZZVU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= From fd668d6f9ed90ce83b625f23694325a842c53909 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jun 2026 22:56:28 +0200 Subject: [PATCH 3/6] chore: derive the rpc procedure name from the request spec --- rpc.go | 69 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/rpc.go b/rpc.go index 7c47d92..34419a2 100644 --- a/rpc.go +++ b/rpc.go @@ -30,11 +30,10 @@ type rpc struct { func forward[Req, Resp any]( ctx context.Context, r *rpc, - name string, req *connect.Request[Req], call func(v1Client.CentrifugoApiClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error), ) (*connect.Response[Resp], error) { - r.log.Debug("got " + name + " request") + r.log.Debug("got api request", "procedure", req.Spec().Procedure) client := r.client.client() if client == nil { @@ -53,133 +52,133 @@ func forward[Req, Resp any]( } func (r *rpc) Batch(ctx context.Context, req *connect.Request[v1Client.BatchRequest]) (*connect.Response[v1Client.BatchResponse], error) { - return forward(ctx, r, "batch", req, v1Client.CentrifugoApiClient.Batch) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Batch) } func (r *rpc) Publish(ctx context.Context, req *connect.Request[v1Client.PublishRequest]) (*connect.Response[v1Client.PublishResponse], error) { - return forward(ctx, r, "publish", req, v1Client.CentrifugoApiClient.Publish) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Publish) } func (r *rpc) Broadcast(ctx context.Context, req *connect.Request[v1Client.BroadcastRequest]) (*connect.Response[v1Client.BroadcastResponse], error) { - return forward(ctx, r, "broadcast", req, v1Client.CentrifugoApiClient.Broadcast) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Broadcast) } func (r *rpc) Subscribe(ctx context.Context, req *connect.Request[v1Client.SubscribeRequest]) (*connect.Response[v1Client.SubscribeResponse], error) { - return forward(ctx, r, "subscribe", req, v1Client.CentrifugoApiClient.Subscribe) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Subscribe) } func (r *rpc) Unsubscribe(ctx context.Context, req *connect.Request[v1Client.UnsubscribeRequest]) (*connect.Response[v1Client.UnsubscribeResponse], error) { - return forward(ctx, r, "unsubscribe", req, v1Client.CentrifugoApiClient.Unsubscribe) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Unsubscribe) } func (r *rpc) Disconnect(ctx context.Context, req *connect.Request[v1Client.DisconnectRequest]) (*connect.Response[v1Client.DisconnectResponse], error) { - return forward(ctx, r, "disconnect", req, v1Client.CentrifugoApiClient.Disconnect) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Disconnect) } func (r *rpc) Presence(ctx context.Context, req *connect.Request[v1Client.PresenceRequest]) (*connect.Response[v1Client.PresenceResponse], error) { - return forward(ctx, r, "presence", req, v1Client.CentrifugoApiClient.Presence) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Presence) } func (r *rpc) PresenceStats(ctx context.Context, req *connect.Request[v1Client.PresenceStatsRequest]) (*connect.Response[v1Client.PresenceStatsResponse], error) { - return forward(ctx, r, "presence_stats", req, v1Client.CentrifugoApiClient.PresenceStats) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.PresenceStats) } func (r *rpc) History(ctx context.Context, req *connect.Request[v1Client.HistoryRequest]) (*connect.Response[v1Client.HistoryResponse], error) { - return forward(ctx, r, "history", req, v1Client.CentrifugoApiClient.History) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.History) } func (r *rpc) HistoryRemove(ctx context.Context, req *connect.Request[v1Client.HistoryRemoveRequest]) (*connect.Response[v1Client.HistoryRemoveResponse], error) { - return forward(ctx, r, "history_remove", req, v1Client.CentrifugoApiClient.HistoryRemove) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.HistoryRemove) } func (r *rpc) Info(ctx context.Context, req *connect.Request[v1Client.InfoRequest]) (*connect.Response[v1Client.InfoResponse], error) { - return forward(ctx, r, "info", req, v1Client.CentrifugoApiClient.Info) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Info) } func (r *rpc) RPC(ctx context.Context, req *connect.Request[v1Client.RPCRequest]) (*connect.Response[v1Client.RPCResponse], error) { - return forward(ctx, r, "rpc", req, v1Client.CentrifugoApiClient.RPC) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.RPC) } func (r *rpc) Refresh(ctx context.Context, req *connect.Request[v1Client.RefreshRequest]) (*connect.Response[v1Client.RefreshResponse], error) { - return forward(ctx, r, "refresh", req, v1Client.CentrifugoApiClient.Refresh) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Refresh) } func (r *rpc) Channels(ctx context.Context, req *connect.Request[v1Client.ChannelsRequest]) (*connect.Response[v1Client.ChannelsResponse], error) { - return forward(ctx, r, "channels", req, v1Client.CentrifugoApiClient.Channels) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Channels) } func (r *rpc) Connections(ctx context.Context, req *connect.Request[v1Client.ConnectionsRequest]) (*connect.Response[v1Client.ConnectionsResponse], error) { - return forward(ctx, r, "connections", req, v1Client.CentrifugoApiClient.Connections) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.Connections) } func (r *rpc) UpdateUserStatus(ctx context.Context, req *connect.Request[v1Client.UpdateUserStatusRequest]) (*connect.Response[v1Client.UpdateUserStatusResponse], error) { - return forward(ctx, r, "update_user_status", req, v1Client.CentrifugoApiClient.UpdateUserStatus) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.UpdateUserStatus) } func (r *rpc) GetUserStatus(ctx context.Context, req *connect.Request[v1Client.GetUserStatusRequest]) (*connect.Response[v1Client.GetUserStatusResponse], error) { - return forward(ctx, r, "get_user_status", req, v1Client.CentrifugoApiClient.GetUserStatus) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.GetUserStatus) } func (r *rpc) DeleteUserStatus(ctx context.Context, req *connect.Request[v1Client.DeleteUserStatusRequest]) (*connect.Response[v1Client.DeleteUserStatusResponse], error) { - return forward(ctx, r, "delete_user_status", req, v1Client.CentrifugoApiClient.DeleteUserStatus) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeleteUserStatus) } func (r *rpc) BlockUser(ctx context.Context, req *connect.Request[v1Client.BlockUserRequest]) (*connect.Response[v1Client.BlockUserResponse], error) { - return forward(ctx, r, "block_user", req, v1Client.CentrifugoApiClient.BlockUser) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.BlockUser) } func (r *rpc) UnblockUser(ctx context.Context, req *connect.Request[v1Client.UnblockUserRequest]) (*connect.Response[v1Client.UnblockUserResponse], error) { - return forward(ctx, r, "unblock_user", req, v1Client.CentrifugoApiClient.UnblockUser) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.UnblockUser) } func (r *rpc) RevokeToken(ctx context.Context, req *connect.Request[v1Client.RevokeTokenRequest]) (*connect.Response[v1Client.RevokeTokenResponse], error) { - return forward(ctx, r, "revoke_token", req, v1Client.CentrifugoApiClient.RevokeToken) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.RevokeToken) } func (r *rpc) InvalidateUserTokens(ctx context.Context, req *connect.Request[v1Client.InvalidateUserTokensRequest]) (*connect.Response[v1Client.InvalidateUserTokensResponse], error) { - return forward(ctx, r, "invalidate_user_tokens", req, v1Client.CentrifugoApiClient.InvalidateUserTokens) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.InvalidateUserTokens) } func (r *rpc) DeviceRegister(ctx context.Context, req *connect.Request[v1Client.DeviceRegisterRequest]) (*connect.Response[v1Client.DeviceRegisterResponse], error) { - return forward(ctx, r, "device_register", req, v1Client.CentrifugoApiClient.DeviceRegister) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceRegister) } func (r *rpc) DeviceUpdate(ctx context.Context, req *connect.Request[v1Client.DeviceUpdateRequest]) (*connect.Response[v1Client.DeviceUpdateResponse], error) { - return forward(ctx, r, "device_update", req, v1Client.CentrifugoApiClient.DeviceUpdate) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceUpdate) } func (r *rpc) DeviceRemove(ctx context.Context, req *connect.Request[v1Client.DeviceRemoveRequest]) (*connect.Response[v1Client.DeviceRemoveResponse], error) { - return forward(ctx, r, "device_remove", req, v1Client.CentrifugoApiClient.DeviceRemove) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceRemove) } func (r *rpc) DeviceList(ctx context.Context, req *connect.Request[v1Client.DeviceListRequest]) (*connect.Response[v1Client.DeviceListResponse], error) { - return forward(ctx, r, "device_list", req, v1Client.CentrifugoApiClient.DeviceList) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceList) } func (r *rpc) DeviceTopicList(ctx context.Context, req *connect.Request[v1Client.DeviceTopicListRequest]) (*connect.Response[v1Client.DeviceTopicListResponse], error) { - return forward(ctx, r, "device_topic_list", req, v1Client.CentrifugoApiClient.DeviceTopicList) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceTopicList) } func (r *rpc) DeviceTopicUpdate(ctx context.Context, req *connect.Request[v1Client.DeviceTopicUpdateRequest]) (*connect.Response[v1Client.DeviceTopicUpdateResponse], error) { - return forward(ctx, r, "device_topic_update", req, v1Client.CentrifugoApiClient.DeviceTopicUpdate) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.DeviceTopicUpdate) } func (r *rpc) UserTopicList(ctx context.Context, req *connect.Request[v1Client.UserTopicListRequest]) (*connect.Response[v1Client.UserTopicListResponse], error) { - return forward(ctx, r, "user_topic_list", req, v1Client.CentrifugoApiClient.UserTopicList) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.UserTopicList) } func (r *rpc) UserTopicUpdate(ctx context.Context, req *connect.Request[v1Client.UserTopicUpdateRequest]) (*connect.Response[v1Client.UserTopicUpdateResponse], error) { - return forward(ctx, r, "user_topic_update", req, v1Client.CentrifugoApiClient.UserTopicUpdate) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.UserTopicUpdate) } func (r *rpc) SendPushNotification(ctx context.Context, req *connect.Request[v1Client.SendPushNotificationRequest]) (*connect.Response[v1Client.SendPushNotificationResponse], error) { - return forward(ctx, r, "send_push_notification", req, v1Client.CentrifugoApiClient.SendPushNotification) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.SendPushNotification) } func (r *rpc) UpdatePushStatus(ctx context.Context, req *connect.Request[v1Client.UpdatePushStatusRequest]) (*connect.Response[v1Client.UpdatePushStatusResponse], error) { - return forward(ctx, r, "update_push_status", req, v1Client.CentrifugoApiClient.UpdatePushStatus) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.UpdatePushStatus) } func (r *rpc) CancelPush(ctx context.Context, req *connect.Request[v1Client.CancelPushRequest]) (*connect.Response[v1Client.CancelPushResponse], error) { - return forward(ctx, r, "cancel_push", req, v1Client.CentrifugoApiClient.CancelPush) + return forward(ctx, r, req, v1Client.CentrifugoApiClient.CancelPush) } From ffcb40e8a48995d0fc747ec279a2b243cb5fd9f1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jun 2026 23:25:53 +0200 Subject: [PATCH 4/6] chore: sync go.work.sum --- go.work.sum | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go.work.sum b/go.work.sum index e34c18a..6d0d8d1 100644 --- a/go.work.sum +++ b/go.work.sum @@ -969,6 +969,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOC github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.2/go.mod h1:itPGVDKf9cC/ov4MdvJ2QZ0khw4bfoo9jzwTJlaxy2k= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0/go.mod h1:obipzmGjfSjam60XLwGfqUkJsfiheAl+TUjG+4yzyPM= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1/go.mod h1:jyqM3eLpJ3IbIFDTKVz2rF9T/xWGW0rIriGwnz8l9Tk= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1/go.mod h1:viRWSEhtMZqz1rhwmOVKkWl6SwmVowfL9O2YR5gI2PE= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= @@ -1038,6 +1039,7 @@ github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnTh github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= +github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -1067,6 +1069,7 @@ github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155/go. github.com/envoyproxy/go-control-plane v0.13.1/go.mod h1:X45hY0mufo6Fd0KW3rqsGvQMw58jvjymeCzBU3mWyHw= github.com/envoyproxy/go-control-plane v0.14.0/go.mod h1:NcS5X47pLl/hfqxU70yPwL9ZMkUlwlKxtAohpi2wBEU= github.com/envoyproxy/go-control-plane/envoy v1.36.0/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98= +github.com/envoyproxy/go-control-plane/envoy v1.37.0/go.mod h1:DReE9MMrmecPy+YvQOAOHNYMALuowAnbjjEMkkWOi6A= github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= @@ -1078,6 +1081,7 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= +github.com/envoyproxy/protoc-gen-validate v1.3.3/go.mod h1:TsndJ/ngyIdQRhMcVVGDDHINPLWB7C82oDArY51KfB0= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= @@ -1101,6 +1105,7 @@ github.com/go-fonts/stix v0.1.0/go.mod h1:w/c1f0ldAUlJmLBvlbkvVXLAD+tAMqobIIQpmn github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1 h1:QbL/5oDUmRBzO9/Z7Seo6zf912W/a6Sr4Eu0G/3Jho0= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4 h1:WtGNWLvXpe6ZudgnXrq0barxBImvnnJoMEhXAzcbM0I= github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= +github.com/go-jose/go-jose/v4 v4.1.4/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U= @@ -1454,6 +1459,7 @@ go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/detectors/gcp v1.31.0/go.mod h1:tzQL6E1l+iV44YFTkcAeNQqzXUiekSYP9jjJjXwEd00= go.opentelemetry.io/contrib/detectors/gcp v1.32.0/go.mod h1:TVqo0Sda4Cv8gCIixd7LuLwW4EylumVWfhjZJjDD4DU= go.opentelemetry.io/contrib/detectors/gcp v1.39.0/go.mod h1:t/OGqzHBa5v6RHZwrDBJ2OirWc+4q/w2fTbLZwAKjTk= +go.opentelemetry.io/contrib/detectors/gcp v1.42.0/go.mod h1:W9zQ439utxymRrXsUOzZbFX4JhLxXU4+ZnCt8GG7yA8= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= @@ -2024,6 +2030,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade/go. google.golang.org/genproto/googleapis/api v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:E5//3O5ZIG2l71Xnt+P/CYUY8Bxs8E7WMoZ9tlcMbAY= google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= +google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171/go.mod h1:M5krXqk4GhBKvB596udGL3UyjL4I1+cTbK0orROM9ng= google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc h1:g3hIDl0jRNd9PPTs2uBzYuaD5mQuwOkZY0vSc0LR32o= google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234015-3fc162c6f38a/go.mod h1:xURIpW9ES5+/GZhnV6beoEtxQrnkRGIfP5VQG2tCBLc= From 950e0405d15f4fa47284e8da55ebcc2d00138dee Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jun 2026 23:26:15 +0200 Subject: [PATCH 5/6] chore: align module versions with the workspace --- go.mod | 13 ++++++------- go.sum | 18 ++++++------------ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 8f7bac4..7bc0ef5 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/common v0.68.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/roadrunner-server/events v1.0.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect @@ -41,11 +41,10 @@ require ( github.com/tklauser/go-sysconf v0.4.0 // indirect github.com/tklauser/numcpus v0.12.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.yaml.in/yaml/v2 v2.4.4 // indirect - golang.org/x/net v0.55.0 // indirect - golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.45.0 // indirect - golang.org/x/text v0.37.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect + golang.org/x/net v0.56.0 // indirect + golang.org/x/sync v0.21.0 // indirect + golang.org/x/sys v0.46.0 // indirect + golang.org/x/text v0.38.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6e7e1b0..fda9622 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,7 @@ github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= -github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= +github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc h1:tJhOZ31bN0HVO4JTgkcjHK1MLu8Waee+rFnbx5AjXyQ= @@ -79,20 +78,15 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= -golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= -golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= -golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= -golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= -golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= -golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/text v0.38.0 h1:sXmwo9DwP3OK9EZ7PqAdaooSGozfl/3a6/xJcbzPRhE= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa h1:mZHHdPZl0dbGHCflZgAq/Q468DWVFcU2whhB2KAo8fk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c h1:oGW7p9KPQPfqJ90YnVa5YfemKjLxc+704Am9EgLZZVU= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= From 6a702736865cff06c3e5161798f91518c5de3e1c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 11 Jun 2026 09:11:25 +0200 Subject: [PATCH 6/6] chore: cover the connect forwarders and status mapping --- go.sum | 6 ++ rpc_forward_test.go | 206 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 rpc_forward_test.go diff --git a/go.sum b/go.sum index fda9622..bd56ba3 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UH github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY= +github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/roadrunner-server/api-go/v6 v6.0.0-beta.12.0.20260610203904-09df89976edc h1:tJhOZ31bN0HVO4JTgkcjHK1MLu8Waee+rFnbx5AjXyQ= @@ -79,14 +80,19 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= +golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.38.0 h1:sXmwo9DwP3OK9EZ7PqAdaooSGozfl/3a6/xJcbzPRhE= +golang.org/x/text v0.38.0/go.mod h1:YXZt3QhHUKYT53r2lLKFIVi6Ao1jdzrTR/KQ09qyxF4= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c h1:oGW7p9KPQPfqJ90YnVa5YfemKjLxc+704Am9EgLZZVU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610202329-623566214e0c/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/rpc_forward_test.go b/rpc_forward_test.go new file mode 100644 index 0000000..b220f55 --- /dev/null +++ b/rpc_forward_test.go @@ -0,0 +1,206 @@ +package centrifuge + +import ( + "context" + "net" + "testing" + + "connectrpc.com/connect" + v1Client "github.com/roadrunner-server/api-go/v6/centrifugo/api/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +// stubAPI implements only Publish; every other CentrifugoApi method responds +// with codes.Unimplemented through the embedded unimplemented server, which +// lets the tests cover both forward() outcomes. +type stubAPI struct { + v1Client.UnimplementedCentrifugoApiServer +} + +func (stubAPI) Publish(context.Context, *v1Client.PublishRequest) (*v1Client.PublishResponse, error) { + return &v1Client.PublishResponse{}, nil +} + +func newBufconnRPC(t *testing.T) *rpc { + t.Helper() + + lis := bufconn.Listen(1024 * 1024) + srv := grpc.NewServer() + v1Client.RegisterCentrifugoApiServer(srv, stubAPI{}) + + go func() { _ = srv.Serve(lis) }() + t.Cleanup(srv.Stop) + + conn, err := grpc.NewClient("passthrough:///bufnet", + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { return lis.DialContext(ctx) }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + + return &rpc{client: &client{centrifugoClient: v1Client.NewCentrifugoApiClient(conn)}, log: testLogger()} +} + +func TestRPCForwardSuccess(t *testing.T) { + r := newBufconnRPC(t) + + resp, err := r.Publish(t.Context(), connect.NewRequest(&v1Client.PublishRequest{})) + require.NoError(t, err) + require.NotNil(t, resp.Msg) +} + +func TestRPCForwardMapsGRPCStatus(t *testing.T) { + r := newBufconnRPC(t) + + calls := map[string]func(ctx context.Context) error{ + "batch": func(ctx context.Context) error { + _, err := r.Batch(ctx, connect.NewRequest(&v1Client.BatchRequest{})) + return err + }, + "broadcast": func(ctx context.Context) error { + _, err := r.Broadcast(ctx, connect.NewRequest(&v1Client.BroadcastRequest{})) + return err + }, + "subscribe": func(ctx context.Context) error { + _, err := r.Subscribe(ctx, connect.NewRequest(&v1Client.SubscribeRequest{})) + return err + }, + "unsubscribe": func(ctx context.Context) error { + _, err := r.Unsubscribe(ctx, connect.NewRequest(&v1Client.UnsubscribeRequest{})) + return err + }, + "disconnect": func(ctx context.Context) error { + _, err := r.Disconnect(ctx, connect.NewRequest(&v1Client.DisconnectRequest{})) + return err + }, + "presence": func(ctx context.Context) error { + _, err := r.Presence(ctx, connect.NewRequest(&v1Client.PresenceRequest{})) + return err + }, + "presence_stats": func(ctx context.Context) error { + _, err := r.PresenceStats(ctx, connect.NewRequest(&v1Client.PresenceStatsRequest{})) + return err + }, + "history": func(ctx context.Context) error { + _, err := r.History(ctx, connect.NewRequest(&v1Client.HistoryRequest{})) + return err + }, + "history_remove": func(ctx context.Context) error { + _, err := r.HistoryRemove(ctx, connect.NewRequest(&v1Client.HistoryRemoveRequest{})) + return err + }, + "info": func(ctx context.Context) error { + _, err := r.Info(ctx, connect.NewRequest(&v1Client.InfoRequest{})) + return err + }, + "rpc": func(ctx context.Context) error { + _, err := r.RPC(ctx, connect.NewRequest(&v1Client.RPCRequest{})) + return err + }, + "refresh": func(ctx context.Context) error { + _, err := r.Refresh(ctx, connect.NewRequest(&v1Client.RefreshRequest{})) + return err + }, + "channels": func(ctx context.Context) error { + _, err := r.Channels(ctx, connect.NewRequest(&v1Client.ChannelsRequest{})) + return err + }, + "connections": func(ctx context.Context) error { + _, err := r.Connections(ctx, connect.NewRequest(&v1Client.ConnectionsRequest{})) + return err + }, + "update_user_status": func(ctx context.Context) error { + _, err := r.UpdateUserStatus(ctx, connect.NewRequest(&v1Client.UpdateUserStatusRequest{})) + return err + }, + "get_user_status": func(ctx context.Context) error { + _, err := r.GetUserStatus(ctx, connect.NewRequest(&v1Client.GetUserStatusRequest{})) + return err + }, + "delete_user_status": func(ctx context.Context) error { + _, err := r.DeleteUserStatus(ctx, connect.NewRequest(&v1Client.DeleteUserStatusRequest{})) + return err + }, + "block_user": func(ctx context.Context) error { + _, err := r.BlockUser(ctx, connect.NewRequest(&v1Client.BlockUserRequest{})) + return err + }, + "unblock_user": func(ctx context.Context) error { + _, err := r.UnblockUser(ctx, connect.NewRequest(&v1Client.UnblockUserRequest{})) + return err + }, + "revoke_token": func(ctx context.Context) error { + _, err := r.RevokeToken(ctx, connect.NewRequest(&v1Client.RevokeTokenRequest{})) + return err + }, + "invalidate_user_tokens": func(ctx context.Context) error { + _, err := r.InvalidateUserTokens(ctx, connect.NewRequest(&v1Client.InvalidateUserTokensRequest{})) + return err + }, + "device_register": func(ctx context.Context) error { + _, err := r.DeviceRegister(ctx, connect.NewRequest(&v1Client.DeviceRegisterRequest{})) + return err + }, + "device_update": func(ctx context.Context) error { + _, err := r.DeviceUpdate(ctx, connect.NewRequest(&v1Client.DeviceUpdateRequest{})) + return err + }, + "device_remove": func(ctx context.Context) error { + _, err := r.DeviceRemove(ctx, connect.NewRequest(&v1Client.DeviceRemoveRequest{})) + return err + }, + "device_list": func(ctx context.Context) error { + _, err := r.DeviceList(ctx, connect.NewRequest(&v1Client.DeviceListRequest{})) + return err + }, + "device_topic_list": func(ctx context.Context) error { + _, err := r.DeviceTopicList(ctx, connect.NewRequest(&v1Client.DeviceTopicListRequest{})) + return err + }, + "device_topic_update": func(ctx context.Context) error { + _, err := r.DeviceTopicUpdate(ctx, connect.NewRequest(&v1Client.DeviceTopicUpdateRequest{})) + return err + }, + "user_topic_list": func(ctx context.Context) error { + _, err := r.UserTopicList(ctx, connect.NewRequest(&v1Client.UserTopicListRequest{})) + return err + }, + "user_topic_update": func(ctx context.Context) error { + _, err := r.UserTopicUpdate(ctx, connect.NewRequest(&v1Client.UserTopicUpdateRequest{})) + return err + }, + "send_push_notification": func(ctx context.Context) error { + _, err := r.SendPushNotification(ctx, connect.NewRequest(&v1Client.SendPushNotificationRequest{})) + return err + }, + "update_push_status": func(ctx context.Context) error { + _, err := r.UpdatePushStatus(ctx, connect.NewRequest(&v1Client.UpdatePushStatusRequest{})) + return err + }, + "cancel_push": func(ctx context.Context) error { + _, err := r.CancelPush(ctx, connect.NewRequest(&v1Client.CancelPushRequest{})) + return err + }, + } + + for name, call := range calls { + t.Run(name, func(t *testing.T) { + err := call(t.Context()) + require.Error(t, err) + // the stub leaves the method unimplemented; the gRPC status code + // must come back as the matching connect code + assert.Equal(t, connect.CodeUnimplemented, connect.CodeOf(err)) + }) + } +} + +func TestPluginRPCHandler(t *testing.T) { + p := &Plugin{client: &client{}, log: testLogger()} + + path, handler := p.RPC() + require.NotEmpty(t, path) + require.NotNil(t, handler) +}