From 6292dce432065158af877f1eae89172132b2d897 Mon Sep 17 00:00:00 2001 From: Dunsin Date: Sat, 25 Apr 2026 20:43:25 +0100 Subject: [PATCH] feat: emit IntentDisruptedEvent to the misbehaving client on batch failure --- .../swagger/ark/v1/service.openapi.json | 19 ++ .../openapi/swagger/ark/v1/types.openapi.json | 16 ++ api-spec/protobuf/ark/v1/service.proto | 1 + api-spec/protobuf/ark/v1/types.proto | 6 + .../protobuf/gen/ark/v1/indexer.pb.rgw.go | 4 +- api-spec/protobuf/gen/ark/v1/service.pb.go | 120 ++++++---- api-spec/protobuf/gen/ark/v1/types.pb.go | 155 ++++++++---- internal/core/application/ban.go | 56 ++++- internal/core/application/service.go | 6 +- internal/core/application/service_event.go | 7 + internal/core/domain/events_repo.go | 1 + .../interface/grpc/handlers/arkservice.go | 12 + internal/test/e2e/delegate_utils_test.go | 16 ++ internal/test/e2e/e2e_test.go | 226 ++++++++++++++++++ pkg/client-lib/batch_session_handler.go | 12 + pkg/client-lib/client/client.go | 6 + pkg/client-lib/client/grpc/types.go | 9 + 17 files changed, 573 insertions(+), 99 deletions(-) diff --git a/api-spec/openapi/swagger/ark/v1/service.openapi.json b/api-spec/openapi/swagger/ark/v1/service.openapi.json index 0782a48ee..59e099c30 100644 --- a/api-spec/openapi/swagger/ark/v1/service.openapi.json +++ b/api-spec/openapi/swagger/ark/v1/service.openapi.json @@ -856,6 +856,9 @@ "heartbeat": { "$ref": "#/components/schemas/Heartbeat" }, + "intentDisrupted": { + "$ref": "#/components/schemas/IntentDisruptedEvent" + }, "streamStarted": { "$ref": "#/components/schemas/StreamStartedEvent" }, @@ -1054,6 +1057,22 @@ } } }, + "IntentDisruptedEvent": { + "title": "IntentDisruptedEvent", + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "round id" + }, + "intentId": { + "type": "string" + }, + "reason": { + "type": "string" + } + } + }, "IntentFeeInfo": { "title": "IntentFeeInfo", "type": "object", diff --git a/api-spec/openapi/swagger/ark/v1/types.openapi.json b/api-spec/openapi/swagger/ark/v1/types.openapi.json index 382760334..93a644e5e 100644 --- a/api-spec/openapi/swagger/ark/v1/types.openapi.json +++ b/api-spec/openapi/swagger/ark/v1/types.openapi.json @@ -149,6 +149,22 @@ } } }, + "IntentDisruptedEvent": { + "title": "IntentDisruptedEvent", + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "round id" + }, + "intentId": { + "type": "string" + }, + "reason": { + "type": "string" + } + } + }, "IntentFeeInfo": { "title": "IntentFeeInfo", "type": "object", diff --git a/api-spec/protobuf/ark/v1/service.proto b/api-spec/protobuf/ark/v1/service.proto index 31a7f179f..bf10576c8 100644 --- a/api-spec/protobuf/ark/v1/service.proto +++ b/api-spec/protobuf/ark/v1/service.proto @@ -264,6 +264,7 @@ message GetEventStreamResponse { TreeNoncesEvent tree_nonces = 9; Heartbeat heartbeat = 10; StreamStartedEvent stream_started = 11; + IntentDisruptedEvent intent_disrupted = 12; } } diff --git a/api-spec/protobuf/ark/v1/types.proto b/api-spec/protobuf/ark/v1/types.proto index 01596acaa..5d7aabdf2 100644 --- a/api-spec/protobuf/ark/v1/types.proto +++ b/api-spec/protobuf/ark/v1/types.proto @@ -114,6 +114,12 @@ message BatchFailedEvent { string reason = 2; } +message IntentDisruptedEvent { + string id = 1; // round id + string intent_id = 2; + string reason = 3; +} + message TreeSigningStartedEvent { string id = 1; repeated string cosigners_pubkeys = 2; diff --git a/api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go b/api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go index 3ee797b0f..6a67b4ec6 100644 --- a/api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go +++ b/api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.go @@ -125,7 +125,7 @@ func request_IndexerService_GetConnectors_0(ctx context.Context, marshaler gatew var ( query_params_IndexerService_GetVtxoTree_0 = gateway.QueryParameterParseOptions{ - Filter: trie.New("vout", "batch_outpoint.txid", "batch_outpoint.vout", "txid"), + Filter: trie.New("batch_outpoint.txid", "batch_outpoint.vout", "txid", "vout"), } ) @@ -173,7 +173,7 @@ func request_IndexerService_GetVtxoTree_0(ctx context.Context, marshaler gateway var ( query_params_IndexerService_GetVtxoTreeLeaves_0 = gateway.QueryParameterParseOptions{ - Filter: trie.New("batch_outpoint.vout", "batch_outpoint.txid", "txid", "vout"), + Filter: trie.New("batch_outpoint.txid", "batch_outpoint.vout", "txid", "vout"), } ) diff --git a/api-spec/protobuf/gen/ark/v1/service.pb.go b/api-spec/protobuf/gen/ark/v1/service.pb.go index 03c074882..51b9e4635 100644 --- a/api-spec/protobuf/gen/ark/v1/service.pb.go +++ b/api-spec/protobuf/gen/ark/v1/service.pb.go @@ -943,6 +943,7 @@ type GetEventStreamResponse struct { // *GetEventStreamResponse_TreeNonces // *GetEventStreamResponse_Heartbeat // *GetEventStreamResponse_StreamStarted + // *GetEventStreamResponse_IntentDisrupted Event isGetEventStreamResponse_Event `protobuf_oneof:"event"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -1084,6 +1085,15 @@ func (x *GetEventStreamResponse) GetStreamStarted() *StreamStartedEvent { return nil } +func (x *GetEventStreamResponse) GetIntentDisrupted() *IntentDisruptedEvent { + if x != nil { + if x, ok := x.Event.(*GetEventStreamResponse_IntentDisrupted); ok { + return x.IntentDisrupted + } + } + return nil +} + type isGetEventStreamResponse_Event interface { isGetEventStreamResponse_Event() } @@ -1132,6 +1142,10 @@ type GetEventStreamResponse_StreamStarted struct { StreamStarted *StreamStartedEvent `protobuf:"bytes,11,opt,name=stream_started,json=streamStarted,proto3,oneof"` } +type GetEventStreamResponse_IntentDisrupted struct { + IntentDisrupted *IntentDisruptedEvent `protobuf:"bytes,12,opt,name=intent_disrupted,json=intentDisrupted,proto3,oneof"` +} + func (*GetEventStreamResponse_BatchStarted) isGetEventStreamResponse_Event() {} func (*GetEventStreamResponse_BatchFinalization) isGetEventStreamResponse_Event() {} @@ -1154,6 +1168,8 @@ func (*GetEventStreamResponse_Heartbeat) isGetEventStreamResponse_Event() {} func (*GetEventStreamResponse_StreamStarted) isGetEventStreamResponse_Event() {} +func (*GetEventStreamResponse_IntentDisrupted) isGetEventStreamResponse_Event() {} + type ModifyTopics struct { state protoimpl.MessageState `protogen:"open.v1"` AddTopics []string `protobuf:"bytes,1,rep,name=add_topics,json=addTopics,proto3" json:"add_topics,omitempty"` @@ -2069,7 +2085,7 @@ const file_ark_v1_service_proto_rawDesc = "" + "\x14signed_commitment_tx\x18\x02 \x01(\tR\x12signedCommitmentTx\" \n" + "\x1eSubmitSignedForfeitTxsResponse\"/\n" + "\x15GetEventStreamRequest\x12\x16\n" + - "\x06topics\x18\x01 \x03(\tR\x06topics\"\x94\x06\n" + + "\x06topics\x18\x01 \x03(\tR\x06topics\"\xdf\x06\n" + "\x16GetEventStreamResponse\x12@\n" + "\rbatch_started\x18\x01 \x01(\v2\x19.ark.v1.BatchStartedEventH\x00R\fbatchStarted\x12O\n" + "\x12batch_finalization\x18\x02 \x01(\v2\x1e.ark.v1.BatchFinalizationEventH\x00R\x11batchFinalization\x12F\n" + @@ -2083,7 +2099,8 @@ const file_ark_v1_service_proto_rawDesc = "" + "treeNonces\x121\n" + "\theartbeat\x18\n" + " \x01(\v2\x11.ark.v1.HeartbeatH\x00R\theartbeat\x12C\n" + - "\x0estream_started\x18\v \x01(\v2\x1a.ark.v1.StreamStartedEventH\x00R\rstreamStartedB\a\n" + + "\x0estream_started\x18\v \x01(\v2\x1a.ark.v1.StreamStartedEventH\x00R\rstreamStarted\x12I\n" + + "\x10intent_disrupted\x18\f \x01(\v2\x1c.ark.v1.IntentDisruptedEventH\x00R\x0fintentDisruptedB\a\n" + "\x05event\"R\n" + "\fModifyTopics\x12\x1d\n" + "\n" + @@ -2222,8 +2239,9 @@ var file_ark_v1_service_proto_goTypes = []any{ (*TreeNoncesEvent)(nil), // 47: ark.v1.TreeNoncesEvent (*Heartbeat)(nil), // 48: ark.v1.Heartbeat (*StreamStartedEvent)(nil), // 49: ark.v1.StreamStartedEvent - (*PendingTx)(nil), // 50: ark.v1.PendingTx - (*TxNotification)(nil), // 51: ark.v1.TxNotification + (*IntentDisruptedEvent)(nil), // 50: ark.v1.IntentDisruptedEvent + (*PendingTx)(nil), // 51: ark.v1.PendingTx + (*TxNotification)(nil), // 52: ark.v1.TxNotification } var file_ark_v1_service_proto_depIdxs = []int32{ 35, // 0: ark.v1.GetInfoResponse.fees:type_name -> ark.v1.FeeInfo @@ -2246,52 +2264,53 @@ var file_ark_v1_service_proto_depIdxs = []int32{ 47, // 17: ark.v1.GetEventStreamResponse.tree_nonces:type_name -> ark.v1.TreeNoncesEvent 48, // 18: ark.v1.GetEventStreamResponse.heartbeat:type_name -> ark.v1.Heartbeat 49, // 19: ark.v1.GetEventStreamResponse.stream_started:type_name -> ark.v1.StreamStartedEvent - 18, // 20: ark.v1.UpdateStreamTopicsRequest.modify:type_name -> ark.v1.ModifyTopics - 19, // 21: ark.v1.UpdateStreamTopicsRequest.overwrite:type_name -> ark.v1.OverwriteTopics - 38, // 22: ark.v1.GetPendingTxRequest.intent:type_name -> ark.v1.Intent - 50, // 23: ark.v1.GetPendingTxResponse.pending_txs:type_name -> ark.v1.PendingTx - 51, // 24: ark.v1.GetTransactionsStreamResponse.commitment_tx:type_name -> ark.v1.TxNotification - 51, // 25: ark.v1.GetTransactionsStreamResponse.ark_tx:type_name -> ark.v1.TxNotification - 48, // 26: ark.v1.GetTransactionsStreamResponse.heartbeat:type_name -> ark.v1.Heartbeat - 51, // 27: ark.v1.GetTransactionsStreamResponse.sweep_tx:type_name -> ark.v1.TxNotification - 38, // 28: ark.v1.GetIntentRequest.intent:type_name -> ark.v1.Intent - 38, // 29: ark.v1.GetIntentResponse.intent:type_name -> ark.v1.Intent - 38, // 30: ark.v1.GetIntentResponse.intents:type_name -> ark.v1.Intent - 0, // 31: ark.v1.ArkService.GetInfo:input_type -> ark.v1.GetInfoRequest - 2, // 32: ark.v1.ArkService.RegisterIntent:input_type -> ark.v1.RegisterIntentRequest - 4, // 33: ark.v1.ArkService.EstimateIntentFee:input_type -> ark.v1.EstimateIntentFeeRequest - 6, // 34: ark.v1.ArkService.DeleteIntent:input_type -> ark.v1.DeleteIntentRequest - 8, // 35: ark.v1.ArkService.ConfirmRegistration:input_type -> ark.v1.ConfirmRegistrationRequest - 10, // 36: ark.v1.ArkService.SubmitTreeNonces:input_type -> ark.v1.SubmitTreeNoncesRequest - 12, // 37: ark.v1.ArkService.SubmitTreeSignatures:input_type -> ark.v1.SubmitTreeSignaturesRequest - 14, // 38: ark.v1.ArkService.SubmitSignedForfeitTxs:input_type -> ark.v1.SubmitSignedForfeitTxsRequest - 16, // 39: ark.v1.ArkService.GetEventStream:input_type -> ark.v1.GetEventStreamRequest - 20, // 40: ark.v1.ArkService.UpdateStreamTopics:input_type -> ark.v1.UpdateStreamTopicsRequest - 22, // 41: ark.v1.ArkService.SubmitTx:input_type -> ark.v1.SubmitTxRequest - 24, // 42: ark.v1.ArkService.FinalizeTx:input_type -> ark.v1.FinalizeTxRequest - 26, // 43: ark.v1.ArkService.GetPendingTx:input_type -> ark.v1.GetPendingTxRequest - 28, // 44: ark.v1.ArkService.GetTransactionsStream:input_type -> ark.v1.GetTransactionsStreamRequest - 30, // 45: ark.v1.ArkService.GetIntent:input_type -> ark.v1.GetIntentRequest - 1, // 46: ark.v1.ArkService.GetInfo:output_type -> ark.v1.GetInfoResponse - 3, // 47: ark.v1.ArkService.RegisterIntent:output_type -> ark.v1.RegisterIntentResponse - 5, // 48: ark.v1.ArkService.EstimateIntentFee:output_type -> ark.v1.EstimateIntentFeeResponse - 7, // 49: ark.v1.ArkService.DeleteIntent:output_type -> ark.v1.DeleteIntentResponse - 9, // 50: ark.v1.ArkService.ConfirmRegistration:output_type -> ark.v1.ConfirmRegistrationResponse - 11, // 51: ark.v1.ArkService.SubmitTreeNonces:output_type -> ark.v1.SubmitTreeNoncesResponse - 13, // 52: ark.v1.ArkService.SubmitTreeSignatures:output_type -> ark.v1.SubmitTreeSignaturesResponse - 15, // 53: ark.v1.ArkService.SubmitSignedForfeitTxs:output_type -> ark.v1.SubmitSignedForfeitTxsResponse - 17, // 54: ark.v1.ArkService.GetEventStream:output_type -> ark.v1.GetEventStreamResponse - 21, // 55: ark.v1.ArkService.UpdateStreamTopics:output_type -> ark.v1.UpdateStreamTopicsResponse - 23, // 56: ark.v1.ArkService.SubmitTx:output_type -> ark.v1.SubmitTxResponse - 25, // 57: ark.v1.ArkService.FinalizeTx:output_type -> ark.v1.FinalizeTxResponse - 27, // 58: ark.v1.ArkService.GetPendingTx:output_type -> ark.v1.GetPendingTxResponse - 29, // 59: ark.v1.ArkService.GetTransactionsStream:output_type -> ark.v1.GetTransactionsStreamResponse - 31, // 60: ark.v1.ArkService.GetIntent:output_type -> ark.v1.GetIntentResponse - 46, // [46:61] is the sub-list for method output_type - 31, // [31:46] is the sub-list for method input_type - 31, // [31:31] is the sub-list for extension type_name - 31, // [31:31] is the sub-list for extension extendee - 0, // [0:31] is the sub-list for field type_name + 50, // 20: ark.v1.GetEventStreamResponse.intent_disrupted:type_name -> ark.v1.IntentDisruptedEvent + 18, // 21: ark.v1.UpdateStreamTopicsRequest.modify:type_name -> ark.v1.ModifyTopics + 19, // 22: ark.v1.UpdateStreamTopicsRequest.overwrite:type_name -> ark.v1.OverwriteTopics + 38, // 23: ark.v1.GetPendingTxRequest.intent:type_name -> ark.v1.Intent + 51, // 24: ark.v1.GetPendingTxResponse.pending_txs:type_name -> ark.v1.PendingTx + 52, // 25: ark.v1.GetTransactionsStreamResponse.commitment_tx:type_name -> ark.v1.TxNotification + 52, // 26: ark.v1.GetTransactionsStreamResponse.ark_tx:type_name -> ark.v1.TxNotification + 48, // 27: ark.v1.GetTransactionsStreamResponse.heartbeat:type_name -> ark.v1.Heartbeat + 52, // 28: ark.v1.GetTransactionsStreamResponse.sweep_tx:type_name -> ark.v1.TxNotification + 38, // 29: ark.v1.GetIntentRequest.intent:type_name -> ark.v1.Intent + 38, // 30: ark.v1.GetIntentResponse.intent:type_name -> ark.v1.Intent + 38, // 31: ark.v1.GetIntentResponse.intents:type_name -> ark.v1.Intent + 0, // 32: ark.v1.ArkService.GetInfo:input_type -> ark.v1.GetInfoRequest + 2, // 33: ark.v1.ArkService.RegisterIntent:input_type -> ark.v1.RegisterIntentRequest + 4, // 34: ark.v1.ArkService.EstimateIntentFee:input_type -> ark.v1.EstimateIntentFeeRequest + 6, // 35: ark.v1.ArkService.DeleteIntent:input_type -> ark.v1.DeleteIntentRequest + 8, // 36: ark.v1.ArkService.ConfirmRegistration:input_type -> ark.v1.ConfirmRegistrationRequest + 10, // 37: ark.v1.ArkService.SubmitTreeNonces:input_type -> ark.v1.SubmitTreeNoncesRequest + 12, // 38: ark.v1.ArkService.SubmitTreeSignatures:input_type -> ark.v1.SubmitTreeSignaturesRequest + 14, // 39: ark.v1.ArkService.SubmitSignedForfeitTxs:input_type -> ark.v1.SubmitSignedForfeitTxsRequest + 16, // 40: ark.v1.ArkService.GetEventStream:input_type -> ark.v1.GetEventStreamRequest + 20, // 41: ark.v1.ArkService.UpdateStreamTopics:input_type -> ark.v1.UpdateStreamTopicsRequest + 22, // 42: ark.v1.ArkService.SubmitTx:input_type -> ark.v1.SubmitTxRequest + 24, // 43: ark.v1.ArkService.FinalizeTx:input_type -> ark.v1.FinalizeTxRequest + 26, // 44: ark.v1.ArkService.GetPendingTx:input_type -> ark.v1.GetPendingTxRequest + 28, // 45: ark.v1.ArkService.GetTransactionsStream:input_type -> ark.v1.GetTransactionsStreamRequest + 30, // 46: ark.v1.ArkService.GetIntent:input_type -> ark.v1.GetIntentRequest + 1, // 47: ark.v1.ArkService.GetInfo:output_type -> ark.v1.GetInfoResponse + 3, // 48: ark.v1.ArkService.RegisterIntent:output_type -> ark.v1.RegisterIntentResponse + 5, // 49: ark.v1.ArkService.EstimateIntentFee:output_type -> ark.v1.EstimateIntentFeeResponse + 7, // 50: ark.v1.ArkService.DeleteIntent:output_type -> ark.v1.DeleteIntentResponse + 9, // 51: ark.v1.ArkService.ConfirmRegistration:output_type -> ark.v1.ConfirmRegistrationResponse + 11, // 52: ark.v1.ArkService.SubmitTreeNonces:output_type -> ark.v1.SubmitTreeNoncesResponse + 13, // 53: ark.v1.ArkService.SubmitTreeSignatures:output_type -> ark.v1.SubmitTreeSignaturesResponse + 15, // 54: ark.v1.ArkService.SubmitSignedForfeitTxs:output_type -> ark.v1.SubmitSignedForfeitTxsResponse + 17, // 55: ark.v1.ArkService.GetEventStream:output_type -> ark.v1.GetEventStreamResponse + 21, // 56: ark.v1.ArkService.UpdateStreamTopics:output_type -> ark.v1.UpdateStreamTopicsResponse + 23, // 57: ark.v1.ArkService.SubmitTx:output_type -> ark.v1.SubmitTxResponse + 25, // 58: ark.v1.ArkService.FinalizeTx:output_type -> ark.v1.FinalizeTxResponse + 27, // 59: ark.v1.ArkService.GetPendingTx:output_type -> ark.v1.GetPendingTxResponse + 29, // 60: ark.v1.ArkService.GetTransactionsStream:output_type -> ark.v1.GetTransactionsStreamResponse + 31, // 61: ark.v1.ArkService.GetIntent:output_type -> ark.v1.GetIntentResponse + 47, // [47:62] is the sub-list for method output_type + 32, // [32:47] is the sub-list for method input_type + 32, // [32:32] is the sub-list for extension type_name + 32, // [32:32] is the sub-list for extension extendee + 0, // [0:32] is the sub-list for field type_name } func init() { file_ark_v1_service_proto_init() } @@ -2312,6 +2331,7 @@ func file_ark_v1_service_proto_init() { (*GetEventStreamResponse_TreeNonces)(nil), (*GetEventStreamResponse_Heartbeat)(nil), (*GetEventStreamResponse_StreamStarted)(nil), + (*GetEventStreamResponse_IntentDisrupted)(nil), } file_ark_v1_service_proto_msgTypes[20].OneofWrappers = []any{ (*UpdateStreamTopicsRequest_Modify)(nil), diff --git a/api-spec/protobuf/gen/ark/v1/types.pb.go b/api-spec/protobuf/gen/ark/v1/types.pb.go index f1a2c2615..f0091d770 100644 --- a/api-spec/protobuf/gen/ark/v1/types.pb.go +++ b/api-spec/protobuf/gen/ark/v1/types.pb.go @@ -1082,6 +1082,66 @@ func (x *BatchFailedEvent) GetReason() string { return "" } +type IntentDisruptedEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // round id + IntentId string `protobuf:"bytes,2,opt,name=intent_id,json=intentId,proto3" json:"intent_id,omitempty"` + Reason string `protobuf:"bytes,3,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *IntentDisruptedEvent) Reset() { + *x = IntentDisruptedEvent{} + mi := &file_ark_v1_types_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *IntentDisruptedEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IntentDisruptedEvent) ProtoMessage() {} + +func (x *IntentDisruptedEvent) ProtoReflect() protoreflect.Message { + mi := &file_ark_v1_types_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IntentDisruptedEvent.ProtoReflect.Descriptor instead. +func (*IntentDisruptedEvent) Descriptor() ([]byte, []int) { + return file_ark_v1_types_proto_rawDescGZIP(), []int{17} +} + +func (x *IntentDisruptedEvent) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *IntentDisruptedEvent) GetIntentId() string { + if x != nil { + return x.IntentId + } + return "" +} + +func (x *IntentDisruptedEvent) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + type TreeSigningStartedEvent struct { state protoimpl.MessageState `protogen:"open.v1"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` @@ -1093,7 +1153,7 @@ type TreeSigningStartedEvent struct { func (x *TreeSigningStartedEvent) Reset() { *x = TreeSigningStartedEvent{} - mi := &file_ark_v1_types_proto_msgTypes[17] + mi := &file_ark_v1_types_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1105,7 +1165,7 @@ func (x *TreeSigningStartedEvent) String() string { func (*TreeSigningStartedEvent) ProtoMessage() {} func (x *TreeSigningStartedEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[17] + mi := &file_ark_v1_types_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1118,7 +1178,7 @@ func (x *TreeSigningStartedEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use TreeSigningStartedEvent.ProtoReflect.Descriptor instead. func (*TreeSigningStartedEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{17} + return file_ark_v1_types_proto_rawDescGZIP(), []int{18} } func (x *TreeSigningStartedEvent) GetId() string { @@ -1152,7 +1212,7 @@ type TreeNoncesAggregatedEvent struct { func (x *TreeNoncesAggregatedEvent) Reset() { *x = TreeNoncesAggregatedEvent{} - mi := &file_ark_v1_types_proto_msgTypes[18] + mi := &file_ark_v1_types_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1164,7 +1224,7 @@ func (x *TreeNoncesAggregatedEvent) String() string { func (*TreeNoncesAggregatedEvent) ProtoMessage() {} func (x *TreeNoncesAggregatedEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[18] + mi := &file_ark_v1_types_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1177,7 +1237,7 @@ func (x *TreeNoncesAggregatedEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use TreeNoncesAggregatedEvent.ProtoReflect.Descriptor instead. func (*TreeNoncesAggregatedEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{18} + return file_ark_v1_types_proto_rawDescGZIP(), []int{19} } func (x *TreeNoncesAggregatedEvent) GetId() string { @@ -1206,7 +1266,7 @@ type TreeNoncesEvent struct { func (x *TreeNoncesEvent) Reset() { *x = TreeNoncesEvent{} - mi := &file_ark_v1_types_proto_msgTypes[19] + mi := &file_ark_v1_types_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1218,7 +1278,7 @@ func (x *TreeNoncesEvent) String() string { func (*TreeNoncesEvent) ProtoMessage() {} func (x *TreeNoncesEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[19] + mi := &file_ark_v1_types_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1231,7 +1291,7 @@ func (x *TreeNoncesEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use TreeNoncesEvent.ProtoReflect.Descriptor instead. func (*TreeNoncesEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{19} + return file_ark_v1_types_proto_rawDescGZIP(), []int{20} } func (x *TreeNoncesEvent) GetId() string { @@ -1276,7 +1336,7 @@ type TreeTxEvent struct { func (x *TreeTxEvent) Reset() { *x = TreeTxEvent{} - mi := &file_ark_v1_types_proto_msgTypes[20] + mi := &file_ark_v1_types_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1288,7 +1348,7 @@ func (x *TreeTxEvent) String() string { func (*TreeTxEvent) ProtoMessage() {} func (x *TreeTxEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[20] + mi := &file_ark_v1_types_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1301,7 +1361,7 @@ func (x *TreeTxEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use TreeTxEvent.ProtoReflect.Descriptor instead. func (*TreeTxEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{20} + return file_ark_v1_types_proto_rawDescGZIP(), []int{21} } func (x *TreeTxEvent) GetId() string { @@ -1359,7 +1419,7 @@ type TreeSignatureEvent struct { func (x *TreeSignatureEvent) Reset() { *x = TreeSignatureEvent{} - mi := &file_ark_v1_types_proto_msgTypes[21] + mi := &file_ark_v1_types_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1371,7 +1431,7 @@ func (x *TreeSignatureEvent) String() string { func (*TreeSignatureEvent) ProtoMessage() {} func (x *TreeSignatureEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[21] + mi := &file_ark_v1_types_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1384,7 +1444,7 @@ func (x *TreeSignatureEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use TreeSignatureEvent.ProtoReflect.Descriptor instead. func (*TreeSignatureEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{21} + return file_ark_v1_types_proto_rawDescGZIP(), []int{22} } func (x *TreeSignatureEvent) GetId() string { @@ -1430,7 +1490,7 @@ type Heartbeat struct { func (x *Heartbeat) Reset() { *x = Heartbeat{} - mi := &file_ark_v1_types_proto_msgTypes[22] + mi := &file_ark_v1_types_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1442,7 +1502,7 @@ func (x *Heartbeat) String() string { func (*Heartbeat) ProtoMessage() {} func (x *Heartbeat) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[22] + mi := &file_ark_v1_types_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1455,7 +1515,7 @@ func (x *Heartbeat) ProtoReflect() protoreflect.Message { // Deprecated: Use Heartbeat.ProtoReflect.Descriptor instead. func (*Heartbeat) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{22} + return file_ark_v1_types_proto_rawDescGZIP(), []int{23} } type StreamStartedEvent struct { @@ -1467,7 +1527,7 @@ type StreamStartedEvent struct { func (x *StreamStartedEvent) Reset() { *x = StreamStartedEvent{} - mi := &file_ark_v1_types_proto_msgTypes[23] + mi := &file_ark_v1_types_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1479,7 +1539,7 @@ func (x *StreamStartedEvent) String() string { func (*StreamStartedEvent) ProtoMessage() {} func (x *StreamStartedEvent) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[23] + mi := &file_ark_v1_types_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1492,7 +1552,7 @@ func (x *StreamStartedEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamStartedEvent.ProtoReflect.Descriptor instead. func (*StreamStartedEvent) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{23} + return file_ark_v1_types_proto_rawDescGZIP(), []int{24} } func (x *StreamStartedEvent) GetId() string { @@ -1514,7 +1574,7 @@ type ErrorDetails struct { func (x *ErrorDetails) Reset() { *x = ErrorDetails{} - mi := &file_ark_v1_types_proto_msgTypes[24] + mi := &file_ark_v1_types_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1526,7 +1586,7 @@ func (x *ErrorDetails) String() string { func (*ErrorDetails) ProtoMessage() {} func (x *ErrorDetails) ProtoReflect() protoreflect.Message { - mi := &file_ark_v1_types_proto_msgTypes[24] + mi := &file_ark_v1_types_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1539,7 +1599,7 @@ func (x *ErrorDetails) ProtoReflect() protoreflect.Message { // Deprecated: Use ErrorDetails.ProtoReflect.Descriptor instead. func (*ErrorDetails) Descriptor() ([]byte, []int) { - return file_ark_v1_types_proto_rawDescGZIP(), []int{24} + return file_ark_v1_types_proto_rawDescGZIP(), []int{25} } func (x *ErrorDetails) GetCode() int32 { @@ -1661,7 +1721,11 @@ const file_ark_v1_types_proto_rawDesc = "" + "\x0fcommitment_txid\x18\x02 \x01(\tR\x0ecommitmentTxid\":\n" + "\x10BatchFailedEvent\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x16\n" + - "\x06reason\x18\x02 \x01(\tR\x06reason\"\x8c\x01\n" + + "\x06reason\x18\x02 \x01(\tR\x06reason\"[\n" + + "\x14IntentDisruptedEvent\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x1b\n" + + "\tintent_id\x18\x02 \x01(\tR\bintentId\x12\x16\n" + + "\x06reason\x18\x03 \x01(\tR\x06reason\"\x8c\x01\n" + "\x17TreeSigningStartedEvent\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12+\n" + "\x11cosigners_pubkeys\x18\x02 \x03(\tR\x10cosignersPubkeys\x124\n" + @@ -1726,7 +1790,7 @@ func file_ark_v1_types_proto_rawDescGZIP() []byte { return file_ark_v1_types_proto_rawDescData } -var file_ark_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 30) +var file_ark_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 31) var file_ark_v1_types_proto_goTypes = []any{ (*Outpoint)(nil), // 0: ark.v1.Outpoint (*Input)(nil), // 1: ark.v1.Input @@ -1745,19 +1809,20 @@ var file_ark_v1_types_proto_goTypes = []any{ (*BatchFinalizationEvent)(nil), // 14: ark.v1.BatchFinalizationEvent (*BatchFinalizedEvent)(nil), // 15: ark.v1.BatchFinalizedEvent (*BatchFailedEvent)(nil), // 16: ark.v1.BatchFailedEvent - (*TreeSigningStartedEvent)(nil), // 17: ark.v1.TreeSigningStartedEvent - (*TreeNoncesAggregatedEvent)(nil), // 18: ark.v1.TreeNoncesAggregatedEvent - (*TreeNoncesEvent)(nil), // 19: ark.v1.TreeNoncesEvent - (*TreeTxEvent)(nil), // 20: ark.v1.TreeTxEvent - (*TreeSignatureEvent)(nil), // 21: ark.v1.TreeSignatureEvent - (*Heartbeat)(nil), // 22: ark.v1.Heartbeat - (*StreamStartedEvent)(nil), // 23: ark.v1.StreamStartedEvent - (*ErrorDetails)(nil), // 24: ark.v1.ErrorDetails - nil, // 25: ark.v1.TxNotification.CheckpointTxsEntry - nil, // 26: ark.v1.TreeNoncesAggregatedEvent.TreeNoncesEntry - nil, // 27: ark.v1.TreeNoncesEvent.NoncesEntry - nil, // 28: ark.v1.TreeTxEvent.ChildrenEntry - nil, // 29: ark.v1.ErrorDetails.MetadataEntry + (*IntentDisruptedEvent)(nil), // 17: ark.v1.IntentDisruptedEvent + (*TreeSigningStartedEvent)(nil), // 18: ark.v1.TreeSigningStartedEvent + (*TreeNoncesAggregatedEvent)(nil), // 19: ark.v1.TreeNoncesAggregatedEvent + (*TreeNoncesEvent)(nil), // 20: ark.v1.TreeNoncesEvent + (*TreeTxEvent)(nil), // 21: ark.v1.TreeTxEvent + (*TreeSignatureEvent)(nil), // 22: ark.v1.TreeSignatureEvent + (*Heartbeat)(nil), // 23: ark.v1.Heartbeat + (*StreamStartedEvent)(nil), // 24: ark.v1.StreamStartedEvent + (*ErrorDetails)(nil), // 25: ark.v1.ErrorDetails + nil, // 26: ark.v1.TxNotification.CheckpointTxsEntry + nil, // 27: ark.v1.TreeNoncesAggregatedEvent.TreeNoncesEntry + nil, // 28: ark.v1.TreeNoncesEvent.NoncesEntry + nil, // 29: ark.v1.TreeTxEvent.ChildrenEntry + nil, // 30: ark.v1.ErrorDetails.MetadataEntry } var file_ark_v1_types_proto_depIdxs = []int32{ 0, // 0: ark.v1.Input.outpoint:type_name -> ark.v1.Outpoint @@ -1766,14 +1831,14 @@ var file_ark_v1_types_proto_depIdxs = []int32{ 3, // 3: ark.v1.Vtxo.assets:type_name -> ark.v1.Asset 2, // 4: ark.v1.TxNotification.spent_vtxos:type_name -> ark.v1.Vtxo 2, // 5: ark.v1.TxNotification.spendable_vtxos:type_name -> ark.v1.Vtxo - 25, // 6: ark.v1.TxNotification.checkpoint_txs:type_name -> ark.v1.TxNotification.CheckpointTxsEntry + 26, // 6: ark.v1.TxNotification.checkpoint_txs:type_name -> ark.v1.TxNotification.CheckpointTxsEntry 0, // 7: ark.v1.TxNotification.swept_vtxos:type_name -> ark.v1.Outpoint 9, // 8: ark.v1.ScheduledSession.fees:type_name -> ark.v1.FeeInfo 10, // 9: ark.v1.FeeInfo.intent_fee:type_name -> ark.v1.IntentFeeInfo - 26, // 10: ark.v1.TreeNoncesAggregatedEvent.tree_nonces:type_name -> ark.v1.TreeNoncesAggregatedEvent.TreeNoncesEntry - 27, // 11: ark.v1.TreeNoncesEvent.nonces:type_name -> ark.v1.TreeNoncesEvent.NoncesEntry - 28, // 12: ark.v1.TreeTxEvent.children:type_name -> ark.v1.TreeTxEvent.ChildrenEntry - 29, // 13: ark.v1.ErrorDetails.metadata:type_name -> ark.v1.ErrorDetails.MetadataEntry + 27, // 10: ark.v1.TreeNoncesAggregatedEvent.tree_nonces:type_name -> ark.v1.TreeNoncesAggregatedEvent.TreeNoncesEntry + 28, // 11: ark.v1.TreeNoncesEvent.nonces:type_name -> ark.v1.TreeNoncesEvent.NoncesEntry + 29, // 12: ark.v1.TreeTxEvent.children:type_name -> ark.v1.TreeTxEvent.ChildrenEntry + 30, // 13: ark.v1.ErrorDetails.metadata:type_name -> ark.v1.ErrorDetails.MetadataEntry 4, // 14: ark.v1.TxNotification.CheckpointTxsEntry.value:type_name -> ark.v1.TxData 15, // [15:15] is the sub-list for method output_type 15, // [15:15] is the sub-list for method input_type @@ -1793,7 +1858,7 @@ func file_ark_v1_types_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ark_v1_types_proto_rawDesc), len(file_ark_v1_types_proto_rawDesc)), NumEnums: 0, - NumMessages: 30, + NumMessages: 31, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/core/application/ban.go b/internal/core/application/ban.go index 5fb3a7ee2..7b108c575 100644 --- a/internal/core/application/ban.go +++ b/internal/core/application/ban.go @@ -52,6 +52,7 @@ func (s *service) banCosignerInputs( registeredIntents []ports.TimedIntent, ) { convictions := make([]domain.Conviction, 0) + notifiedIntents := make(map[string]struct{}) for cosignerPublicKey, crime := range toBan { for _, intent := range registeredIntents { @@ -92,6 +93,21 @@ func (s *service) banCosignerInputs( domain.NewScriptConviction(script, crime, &s.banDuration), ) } + if _, already := notifiedIntents[intent.Id]; !already { + notifiedIntents[intent.Id] = struct{}{} + + topics := intentTopics(intent) + s.eventsCh <- []domain.Event{IntentDisrupted{ + RoundEvent: domain.RoundEvent{ + Id: crime.RoundID, + Type: domain.EventTypeIntentDisrupted, + }, + IntentId: intent.Id, + Reason: crime.Reason, + Topic: topics, + }} + + } } } @@ -148,7 +164,7 @@ func (s *service) banSignaturesCollectionTimeout( } func (s *service) banForfeitCollectionTimeout( - ctx context.Context, roundId string, + ctx context.Context, roundId string, registeredIntents []ports.TimedIntent, ) { unsignedVtxoKeys, err := s.cache.ForfeitTxs().GetUnsignedInputs(ctx) if err != nil { @@ -184,4 +200,42 @@ func (s *service) banForfeitCollectionTimeout( if err := s.repoManager.Convictions().Add(ctx, convictions...); err != nil { log.WithError(err).Warn("failed to ban vtxos") } + // build a set of unsigned outpoints for fast lookup + unsignedOutpoints := make(map[string]struct{}) + for _, vtxo := range vtxos { + unsignedOutpoints[vtxo.Outpoint.String()] = struct{}{} + } + + // find which intents own those unsigned vtxos and notify them + for _, intent := range registeredIntents { + for _, input := range intent.Inputs { + if _, found := unsignedOutpoints[input.Outpoint.String()]; !found { + continue + } + + // this intent has at least one unsigned vtxo — it caused the failure + s.eventsCh <- []domain.Event{IntentDisrupted{ + RoundEvent: domain.RoundEvent{ + Id: roundId, + Type: domain.EventTypeIntentDisrupted, + }, + IntentId: intent.Id, + Reason: "missing forfeit signature", + Topic: intentTopics(intent), + }} + break + } + } + +} + +func intentTopics(intent ports.TimedIntent) []string { + topics := make([]string, 0, len(intent.Inputs)+len(intent.BoardingInputs)) + for _, input := range intent.Inputs { + topics = append(topics, input.Outpoint.String()) + } + for _, boardingInput := range intent.BoardingInputs { + topics = append(topics, boardingInput.String()) + } + return topics } diff --git a/internal/core/application/service.go b/internal/core/application/service.go index 281b30c72..62faf8727 100644 --- a/internal/core/application/service.go +++ b/internal/core/application/service.go @@ -3158,7 +3158,11 @@ func (s *service) finalizeRound(roundId string, roundTiming roundTiming) { return } if !allForfeitTxsSigned { - go s.banForfeitCollectionTimeout(ctx, roundId) + registeredIntents, err := s.cache.Intents().GetSelectedIntents(ctx) + if err != nil { + log.WithError(err).Warn("failed to get selected intents for forfeit ban") + } + go s.banForfeitCollectionTimeout(ctx, roundId, registeredIntents) changes = round.Fail(errors.INTERNAL_ERROR.New("missing forfeit transactions")) return diff --git a/internal/core/application/service_event.go b/internal/core/application/service_event.go index a0e03264b..a12774297 100644 --- a/internal/core/application/service_event.go +++ b/internal/core/application/service_event.go @@ -63,6 +63,13 @@ type RoundFailed struct { Topic []string } +type IntentDisrupted struct { + domain.RoundEvent + IntentId string + Reason string + Topic []string +} + // implement domain.RoundEvent interface func (r RoundSigningStarted) GetTopic() string { return domain.RoundTopic } func (r TreeNoncesAggregated) GetTopic() string { return domain.RoundTopic } diff --git a/internal/core/domain/events_repo.go b/internal/core/domain/events_repo.go index 5f57a64fe..2104ee3d2 100644 --- a/internal/core/domain/events_repo.go +++ b/internal/core/domain/events_repo.go @@ -14,6 +14,7 @@ const ( EventTypeRoundFinalized EventTypeRoundFailed EventTypeBatchSwept + EventTypeIntentDisrupted ) const ( diff --git a/internal/interface/grpc/handlers/arkservice.go b/internal/interface/grpc/handlers/arkservice.go index e4e9a4996..34360873c 100644 --- a/internal/interface/grpc/handlers/arkservice.go +++ b/internal/interface/grpc/handlers/arkservice.go @@ -523,6 +523,18 @@ func (h *handler) listenToEvents() { }, } + evs = append(evs, eventWithTopics{event: ev, topics: e.Topic}) + case application.IntentDisrupted: + ev := &arkv1.GetEventStreamResponse{ + Event: &arkv1.GetEventStreamResponse_IntentDisrupted{ + IntentDisrupted: &arkv1.IntentDisruptedEvent{ + Id: e.Id, + IntentId: e.IntentId, + Reason: e.Reason, + }, + }, + } + evs = append(evs, eventWithTopics{event: ev, topics: e.Topic}) case application.BatchStarted: hashes := make([]string, 0, len(e.IntentIdsHashes)) diff --git a/internal/test/e2e/delegate_utils_test.go b/internal/test/e2e/delegate_utils_test.go index afe56ac10..49eb89274 100644 --- a/internal/test/e2e/delegate_utils_test.go +++ b/internal/test/e2e/delegate_utils_test.go @@ -74,6 +74,11 @@ func (h *delegateBatchEventsHandler) OnBatchFailed( return nil } +func (h *delegateBatchEventsHandler) OnIntentDisrupted(ctx context.Context, event client.IntentDisruptedEvent, +) error { + return fmt.Errorf("intent disrupted: %s", event.Reason) +} + func (h *delegateBatchEventsHandler) OnTreeTxEvent( ctx context.Context, event client.TreeTxEvent, ) error { @@ -229,6 +234,7 @@ type customBatchEventsHandler struct { onBatchFinalization func(ctx context.Context, event client.BatchFinalizationEvent, vtxoTree *tree.TxTree, connectorTree *tree.TxTree) ([]string, error) onBatchFinalized func(ctx context.Context, event client.BatchFinalizedEvent) error onBatchFailed func(ctx context.Context, event client.BatchFailedEvent) error + onIntentDisrupted func(ctx context.Context, event client.IntentDisruptedEvent) error onTreeTxEvent func(ctx context.Context, event client.TreeTxEvent) error onTreeSignatureEvent func(ctx context.Context, event client.TreeSignatureEvent) error onTreeSigningStarted func(ctx context.Context, event client.TreeSigningStartedEvent, vtxoTree *tree.TxTree) (bool, error) @@ -287,6 +293,16 @@ func (h *customBatchEventsHandler) OnBatchFailed( return errors.New(event.Reason) } +func (h *customBatchEventsHandler) OnIntentDisrupted( + ctx context.Context, + event client.IntentDisruptedEvent, +) error { + if h.onIntentDisrupted != nil { + return h.onIntentDisrupted(ctx, event) + } + return fmt.Errorf("intent disrupted: %s", event.Reason) +} + func (h *customBatchEventsHandler) OnTreeTxEvent( ctx context.Context, event client.TreeTxEvent, diff --git a/internal/test/e2e/e2e_test.go b/internal/test/e2e/e2e_test.go index 9fa5d6ae2..1a1f6008f 100644 --- a/internal/test/e2e/e2e_test.go +++ b/internal/test/e2e/e2e_test.go @@ -3996,6 +3996,7 @@ func TestBan(t *testing.T) { require.NoError(t, err) defer close() + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { buf := sha256.Sum256([]byte(intentId)) @@ -4011,10 +4012,19 @@ func TestBan(t *testing.T) { onTreeSigningStarted: func(ctx context.Context, event client.TreeSigningStartedEvent, vtxoTree *tree.TxTree) (bool, error) { return true, nil // just skip, do not submit nonces }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the nonce has not been submitted _, err = alice.Settle(t.Context()) @@ -4073,6 +4083,7 @@ func TestBan(t *testing.T) { defer close() var batchExpiry arklib.RelativeLocktime + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { buf := sha256.Sum256([]byte(intentId)) @@ -4148,10 +4159,19 @@ func TestBan(t *testing.T) { onTreeNoncesAggregated: func(ctx context.Context, event client.TreeNoncesAggregatedEvent) (bool, error) { return false, nil // skip sending signatures }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the signature has not been submitted _, err = alice.Settle(t.Context()) @@ -4208,6 +4228,7 @@ func TestBan(t *testing.T) { require.NoError(t, err) defer close() + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { buf := sha256.Sum256([]byte(intentId)) @@ -4280,10 +4301,19 @@ func TestBan(t *testing.T) { ) return err == nil, err }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the signature was invalid _, err = alice.Settle(t.Context()) @@ -4341,6 +4371,7 @@ func TestBan(t *testing.T) { defer close() var batchExpiry arklib.RelativeLocktime + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { buf := sha256.Sum256([]byte(intentId)) @@ -4432,10 +4463,19 @@ func TestBan(t *testing.T) { onBatchFinalization: func(ctx context.Context, event client.BatchFinalizationEvent, vtxoTree, connectorTree *tree.TxTree) ([]string, error) { return nil, nil // do not submit forfeit txs }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the forfeit txs have not been submitted _, err = alice.Settle(t.Context()) @@ -4495,6 +4535,7 @@ func TestBan(t *testing.T) { info, err := grpcAlice.GetInfo(t.Context()) require.NoError(t, err) var batchExpiry arklib.RelativeLocktime + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { @@ -4638,10 +4679,19 @@ func TestBan(t *testing.T) { } return []string{signedForfeitTx}, nil }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the forfeit txs have not been submitted _, err = alice.Settle(t.Context()) @@ -4716,6 +4766,7 @@ func TestBan(t *testing.T) { defer close() var batchExpiry arklib.RelativeLocktime + var disruptedEvent *client.IntentDisruptedEvent handlers := &customBatchEventsHandler{ onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { buf := sha256.Sum256([]byte(intentId)) @@ -4834,15 +4885,190 @@ func TestBan(t *testing.T) { } return []string{signedCommitmentTx}, nil }, + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep listening, IntentDisrupted arrives shortly after + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + disruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, } _, _, _, _, _, err = arksdk.JoinBatchSession(t.Context(), stream, handlers) require.Error(t, err) + require.NotNil(t, disruptedEvent) + require.Equal(t, intentId, disruptedEvent.IntentId) // next settle should fail because the forfeit txs have not been submitted _, err = alice.Settle(t.Context()) require.Error(t, err) }) + + t.Run("bystander does not receive intent disrupted", func(t *testing.T) { + // Alice misbehaves (skips nonce submission), Bob cooperates. + // IntentDisrupted must reach Alice but NOT Bob. + alice, grpcAlice := setupArkSDKWithTransport(t) + defer alice.Stop() + defer grpcAlice.Close() + + bob, grpcBob := setupArkSDKWithTransport(t) + defer bob.Stop() + defer grpcBob.Close() + + _, aliceAddr, _, err := alice.Receive(t.Context()) + require.NoError(t, err) + faucetOffchain(t, alice, 0.001) + + _, bobAddr, _, err := bob.Receive(t.Context()) + require.NoError(t, err) + faucetOffchain(t, bob, 0.001) + + aliceVtxos, _, err := alice.ListVtxos(t.Context()) + require.NoError(t, err) + require.NotEmpty(t, aliceVtxos) + aliceVtxo := aliceVtxos[0] + + bobVtxos, _, err := bob.ListVtxos(t.Context()) + require.NoError(t, err) + require.NotEmpty(t, bobVtxos) + bobVtxo := bobVtxos[0] + + // Alice's signer session — she will skip nonce submission to trigger a ban. + secKey, err := btcec.NewPrivateKey() + require.NoError(t, err) + signerSession := tree.NewTreeSignerSession(secKey) + + // Bob's signer session — he cooperates and properly submits nonces. + bobSecKey, err := btcec.NewPrivateKey() + require.NoError(t, err) + bobSignerSession := tree.NewTreeSignerSession(bobSecKey) + var bobBatchExpiry arklib.RelativeLocktime + + aliceIntentId, err := alice.RegisterIntent( + t.Context(), + []types.Vtxo{aliceVtxo}, + []types.Utxo{}, + nil, + []types.Receiver{{Amount: aliceVtxo.Amount, To: aliceAddr.Address}}, + []string{signerSession.GetPublicKey()}, + ) + require.NoError(t, err) + + bobIntentId, err := bob.RegisterIntent( + t.Context(), + []types.Vtxo{bobVtxo}, + []types.Utxo{}, + nil, + []types.Receiver{{Amount: bobVtxo.Amount, To: bobAddr.Address}}, + []string{bobSignerSession.GetPublicKey()}, + ) + require.NoError(t, err) + + aliceTopics := arksdk.GetEventStreamTopics( + []types.Outpoint{aliceVtxo.Outpoint}, []tree.SignerSession{signerSession}, + ) + aliceStream, aliceClose, err := grpcAlice.GetEventStream(t.Context(), aliceTopics) + require.NoError(t, err) + defer aliceClose() + + bobTopics := arksdk.GetEventStreamTopics( + []types.Outpoint{bobVtxo.Outpoint}, []tree.SignerSession{bobSignerSession}, + ) + bobStream, bobClose, err := grpcBob.GetEventStream(t.Context(), bobTopics) + require.NoError(t, err) + defer bobClose() + + var aliceDisruptedEvent *client.IntentDisruptedEvent + var bobDisruptedEvent *client.IntentDisruptedEvent + + wg := sync.WaitGroup{} + wg.Add(2) + + var aliceErr, bobErr error + + go func() { + defer wg.Done() + handlers := &customBatchEventsHandler{ + onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { + buf := sha256.Sum256([]byte(aliceIntentId)) + hashedIntentId := hex.EncodeToString(buf[:]) + if slices.Contains(event.HashedIntentIds, hashedIntentId) { + err := grpcAlice.ConfirmRegistration(ctx, aliceIntentId) + return false, time.Duration(event.BatchExpiry) * time.Second, err + } + return true, -1, nil + }, + // onTreeSigningStarted not set: default returns (false, nil), skipping nonce submission + onBatchFailed: func(ctx context.Context, event client.BatchFailedEvent) error { + return nil // keep waiting for IntentDisrupted + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + aliceDisruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, + } + _, _, _, _, _, aliceErr = arksdk.JoinBatchSession(t.Context(), aliceStream, handlers) + }() + + go func() { + defer wg.Done() + handlers := &customBatchEventsHandler{ + onBatchStarted: func(ctx context.Context, event client.BatchStartedEvent) (bool, time.Duration, error) { + buf := sha256.Sum256([]byte(bobIntentId)) + hashedIntentId := hex.EncodeToString(buf[:]) + if slices.Contains(event.HashedIntentIds, hashedIntentId) { + err := grpcBob.ConfirmRegistration(ctx, bobIntentId) + bobBatchExpiry = getBatchExpiryLocktime(uint32(event.BatchExpiry)) + return false, time.Duration(event.BatchExpiry) * time.Second, err + } + return true, -1, nil + }, + onTreeSigningStarted: func(ctx context.Context, event client.TreeSigningStartedEvent, vtxoTree *tree.TxTree) (bool, error) { + if !slices.Contains(event.CosignersPubkeys, bobSignerSession.GetPublicKey()) { + return true, nil + } + sweepClosure := script.CSVMultisigClosure{ + MultisigClosure: script.MultisigClosure{PubKeys: []*btcec.PublicKey{bobSecKey.PubKey()}}, + Locktime: bobBatchExpiry, + } + sweepScript, err := sweepClosure.Script() + if err != nil { + return false, err + } + commitmentTx, err := psbt.NewFromRawBytes(strings.NewReader(event.UnsignedCommitmentTx), true) + if err != nil { + return false, err + } + batchOutputAmount := commitmentTx.UnsignedTx.TxOut[0].Value + sweepTapLeaf := txscript.NewBaseTapLeaf(sweepScript) + sweepTapTree := txscript.AssembleTaprootScriptTree(sweepTapLeaf) + root := sweepTapTree.RootNode.TapHash() + if err := bobSignerSession.Init(root.CloneBytes(), batchOutputAmount, vtxoTree); err != nil { + return false, err + } + nonces, err := bobSignerSession.GetNonces() + if err != nil { + return false, err + } + return false, grpcBob.SubmitTreeNonces(ctx, event.Id, bobSignerSession.GetPublicKey(), nonces) + }, + onIntentDisrupted: func(ctx context.Context, event client.IntentDisruptedEvent) error { + bobDisruptedEvent = &event + return fmt.Errorf("intent disrupted: %s", event.Reason) + }, + } + _, _, _, _, _, bobErr = arksdk.JoinBatchSession(t.Context(), bobStream, handlers) + }() + + wg.Wait() + + require.Error(t, aliceErr) + require.NotNil(t, aliceDisruptedEvent) + require.Equal(t, aliceIntentId, aliceDisruptedEvent.IntentId) + + require.Nil(t, bobDisruptedEvent) + _ = bobErr + }) } // TestFee tests the fee calculation for the onboarding and settlement of the funds. diff --git a/pkg/client-lib/batch_session_handler.go b/pkg/client-lib/batch_session_handler.go index 2e8fea8bd..68863a4c5 100644 --- a/pkg/client-lib/batch_session_handler.go +++ b/pkg/client-lib/batch_session_handler.go @@ -55,6 +55,7 @@ type BatchEventsHandler interface { ) (bool, time.Duration, error) OnBatchFinalized(ctx context.Context, event client.BatchFinalizedEvent) error OnBatchFailed(ctx context.Context, event client.BatchFailedEvent) error + OnIntentDisrupted(ctx context.Context, event client.IntentDisruptedEvent) error OnTreeTxEvent(ctx context.Context, event client.TreeTxEvent) error OnTreeSignatureEvent(ctx context.Context, event client.TreeSignatureEvent) error OnTreeSigningStarted( @@ -179,6 +180,13 @@ func JoinBatchSession( return "", "", -1, nil, nil, err } continue + case client.IntentDisruptedEvent: + e := event.(client.IntentDisruptedEvent) + err := eventsHandler.OnIntentDisrupted(ctx, e) + if err == nil { + err = fmt.Errorf("intent disrupted: %s", e.Reason) + } + return "", "", -1, nil, nil, err // we received a tree tx event msg, let's update the vtxo/connector tree. case client.TreeTxEvent: if step != batchStarted && step != treeNoncesAggregated { @@ -436,6 +444,10 @@ func (h *defaultBatchEventsHandler) OnBatchFailed( return fmt.Errorf("batch failed: %s", event.Reason) } +func (h *defaultBatchEventsHandler) OnIntentDisrupted(ctx context.Context, event client.IntentDisruptedEvent) error { + return fmt.Errorf("intent disrupted: %s", event.Reason) +} + func (h *defaultBatchEventsHandler) OnTreeTxEvent( ctx context.Context, event client.TreeTxEvent, ) error { diff --git a/pkg/client-lib/client/client.go b/pkg/client-lib/client/client.go index 1741bfba4..49988cb5e 100644 --- a/pkg/client-lib/client/client.go +++ b/pkg/client-lib/client/client.go @@ -109,6 +109,12 @@ type BatchFailedEvent struct { Reason string } +type IntentDisruptedEvent struct { + Id string + IntentId string + Reason string +} + type TreeSigningStartedEvent struct { Id string UnsignedCommitmentTx string diff --git a/pkg/client-lib/client/grpc/types.go b/pkg/client-lib/client/grpc/types.go index 1f2da84ca..065d848f5 100644 --- a/pkg/client-lib/client/grpc/types.go +++ b/pkg/client-lib/client/grpc/types.go @@ -14,6 +14,7 @@ import ( // wrapper for GetEventStreamResponse type eventResponse interface { GetBatchFailed() *arkv1.BatchFailedEvent + GetIntentDisrupted() *arkv1.IntentDisruptedEvent GetBatchStarted() *arkv1.BatchStartedEvent GetBatchFinalization() *arkv1.BatchFinalizationEvent GetBatchFinalized() *arkv1.BatchFinalizedEvent @@ -42,6 +43,14 @@ func (e event) toBatchEvent() (any, error) { }, nil } + if ee := e.GetIntentDisrupted(); ee != nil { + return client.IntentDisruptedEvent{ + Id: ee.GetId(), + IntentId: ee.GetIntentId(), + Reason: ee.GetReason(), + }, nil + } + if ee := e.GetBatchStarted(); ee != nil { return client.BatchStartedEvent{ Id: ee.GetId(),