diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtension.java b/core/src/main/java/org/lflang/federated/extensions/CExtension.java index 257ba1b323..5b3806b54e 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtension.java @@ -489,11 +489,12 @@ public String getNetworkBufferType() { /** Put the C preamble in a `include/_federate.name + _preamble.h` file. */ protected final void writePreambleFile( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) throws IOException { - String cPreamble = makePreamble(federate, rtiConfig, messageReporter); + String cPreamble = makePreamble(federate, allFederates, rtiConfig, messageReporter); String relPath = getPreamblePath(federate); Path fedPreamblePath = fileConfig.getSrcPath().resolve(relPath); Files.createDirectories(fedPreamblePath.getParent()); @@ -509,11 +510,12 @@ protected final void writePreambleFile( @Override public String generatePreamble( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) throws IOException { - writePreambleFile(federate, fileConfig, rtiConfig, messageReporter); + writePreambleFile(federate, allFederates, fileConfig, rtiConfig, messageReporter); var includes = new CodeBuilder(); includes.pr( """ @@ -539,7 +541,10 @@ public String generatePreamble( /** Generate the preamble to setup federated execution in C. */ protected String makePreamble( - FederateInstance federate, RtiConfig rtiConfig, MessageReporter messageReporter) { + FederateInstance federate, + List allFederates, + RtiConfig rtiConfig, + MessageReporter messageReporter) { var code = new CodeBuilder(); @@ -609,7 +614,7 @@ protected String makePreamble( """ .formatted(numOfSTAAOffsets))); - code.pr(generateExecutablePreamble(federate, rtiConfig, messageReporter)); + code.pr(generateExecutablePreamble(federate, allFederates, rtiConfig, messageReporter)); code.pr(generateSTAAInitialization(federate)); @@ -663,12 +668,15 @@ private String generateInitializeTriggers( /** Generate code for an executed preamble. */ private String generateExecutablePreamble( - FederateInstance federate, RtiConfig rtiConfig, MessageReporter messageReporter) { + FederateInstance federate, + List allFederates, + RtiConfig rtiConfig, + MessageReporter messageReporter) { CodeBuilder code = new CodeBuilder(); code.pr(generateCodeForPhysicalActions(federate, messageReporter)); - code.pr(generateCodeToInitializeFederate(federate, rtiConfig, messageReporter)); + code.pr(generateCodeToInitializeFederate(federate, allFederates, rtiConfig, messageReporter)); return """ void _lf_executable_preamble(environment_t* env) { %s @@ -698,7 +706,10 @@ void staa_initialization() { * @return The generated code */ private String generateCodeToInitializeFederate( - FederateInstance federate, RtiConfig rtiConfig, MessageReporter messageReporter) { + FederateInstance federate, + List allFederates, + RtiConfig rtiConfig, + MessageReporter messageReporter) { CodeBuilder code = new CodeBuilder(); code.pr("// ***** Start initializing the federated execution. */"); code.pr( @@ -758,18 +769,30 @@ else if (globalSTP instanceof CodeExprImpl) // thread without requiring a mutex lock. var numberOfInboundConnections = federate.inboundP2PConnections.size(); var numberOfOutboundConnections = federate.outboundP2PConnections.size(); + var numberOfInboundConnectionsToTransients = + (int) federate.inboundP2PConnections.stream().filter(f -> f.isTransient).count(); + var numberOfOutboundConnectionsToTransients = + (int) federate.outboundP2PConnections.stream().filter(f -> f.isTransient).count(); code.pr( String.join( "\n", "_fed.number_of_inbound_p2p_connections = " + numberOfInboundConnections + ";", - "_fed.number_of_outbound_p2p_connections = " + numberOfOutboundConnections + ";")); + "_fed.number_of_outbound_p2p_connections = " + numberOfOutboundConnections + ";", + "_fed.number_of_inbound_p2p_transients = " + + numberOfInboundConnectionsToTransients + + ";", + "_fed.number_of_outbound_p2p_transients = " + + numberOfOutboundConnectionsToTransients + + ";")); + code.pr( String.join( "\n", "// Initialize the array of network abstractions for incoming connections to -1.", "for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {", " _fed.net_for_inbound_p2p_connections[i] = NULL;", + " _fed.inbound_p2p_connection_is_transient[i] = false;", "}")); code.pr( String.join( @@ -778,6 +801,7 @@ else if (globalSTP instanceof CodeExprImpl) "for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {", " _fed.net_for_outbound_p2p_connections[i] = NULL;", "}")); + var clockSyncOptions = federate.targetConfig.getOrDefault(ClockSyncOptionsProperty.INSTANCE); // If a test clock offset has been specified, insert code to set it here. if (clockSyncOptions.testOffset != null) { @@ -819,13 +843,21 @@ else if (globalSTP instanceof CodeExprImpl) "// This is done in a separate thread because this thread will call", "// lf_connect_to_federate for each outbound connection at the same", "// time that the new thread is listening for such connections for inbound", - "// connections. The thread will live until all connections have been established.", + "// connections. The thread will live until all connections have been established,", + "// or if outbound connections are transient, until the end of the execution.", "lf_thread_create(&_fed.inbound_p2p_handling_thread_id," + " lf_handle_p2p_connections_from_federates, env);")); } for (FederateInstance remoteFederate : federate.outboundP2PConnections) { - code.pr("lf_connect_to_federate(" + remoteFederate.id + ");"); + code.pr( + "lf_connect_to_federate(" + remoteFederate.id + ", " + remoteFederate.isTransient + ");"); + code.pr( + "_fed.outbound_p2p_connection_is_transient[" + + remoteFederate.id + + "] = " + + remoteFederate.isTransient + + ";"); } return code.getCode(); } diff --git a/core/src/main/java/org/lflang/federated/extensions/FedTargetExtension.java b/core/src/main/java/org/lflang/federated/extensions/FedTargetExtension.java index 770ddf6a97..bb2e6cd28d 100644 --- a/core/src/main/java/org/lflang/federated/extensions/FedTargetExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/FedTargetExtension.java @@ -115,12 +115,14 @@ default void annotateReaction(Reaction reaction) {} * Add preamble to the source to set up federated execution. * * @param federate The federate to which the generated setup code will correspond. + * @param allFederates The list of all federates in the federation. * @param fileConfig The federation file configuration. * @param rtiConfig The settings of the RTI. * @param messageReporter Used to report errors and warnings. */ String generatePreamble( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) diff --git a/core/src/main/java/org/lflang/federated/extensions/PythonExtension.java b/core/src/main/java/org/lflang/federated/extensions/PythonExtension.java index 0371ea8794..9cfa7a0256 100644 --- a/core/src/main/java/org/lflang/federated/extensions/PythonExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/PythonExtension.java @@ -1,6 +1,7 @@ package org.lflang.federated.extensions; import java.io.IOException; +import java.util.List; import org.lflang.InferredType; import org.lflang.MessageReporter; import org.lflang.ast.ASTUtils; @@ -261,11 +262,12 @@ public void annotateReaction(Reaction reaction) { @Override public String generatePreamble( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) throws IOException { - writePreambleFile(federate, fileConfig, rtiConfig, messageReporter); + writePreambleFile(federate, allFederates, fileConfig, rtiConfig, messageReporter); return ""; } } diff --git a/core/src/main/java/org/lflang/federated/extensions/TSExtension.java b/core/src/main/java/org/lflang/federated/extensions/TSExtension.java index c98b6852c3..e19f026c3e 100644 --- a/core/src/main/java/org/lflang/federated/extensions/TSExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/TSExtension.java @@ -162,6 +162,7 @@ public String getNetworkBufferType() { @Override public String generatePreamble( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) { diff --git a/core/src/main/java/org/lflang/federated/generator/FedEmitter.java b/core/src/main/java/org/lflang/federated/generator/FedEmitter.java index cb03fb83b9..8f59909b84 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedEmitter.java +++ b/core/src/main/java/org/lflang/federated/generator/FedEmitter.java @@ -37,7 +37,10 @@ public FedEmitter( /** Generate a .lf file for federate `federate`. */ Map generateFederate( - LFGeneratorContext context, FederateInstance federate, List federateNames) + LFGeneratorContext context, + FederateInstance federate, + List allFederates, + List federateNames) throws IOException { String fedName = federate.name; Files.createDirectories(fileConfig.getSrcPath()); @@ -57,7 +60,7 @@ Map generateFederate( context, federateNames, federate, fileConfig, messageReporter, rtiConfig), new FedImportEmitter().generateImports(federate, fileConfig), new FedPreambleEmitter() - .generatePreamble(federate, fileConfig, rtiConfig, messageReporter), + .generatePreamble(federate, allFederates, fileConfig, rtiConfig, messageReporter), new FedReactorEmitter().generateReactorDefinitions(federate), new FedMainEmitter() .generateMainReactor(federate, originalMainReactor, messageReporter)); diff --git a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java index f9f804969b..1818c3a941 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java +++ b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java @@ -181,6 +181,7 @@ public boolean doGenerate(Resource resource, LFGeneratorContext context) throws fedEmitter.generateFederate( context, federate, + federates, federates.stream().map(fed -> fed.name).collect(Collectors.toList()))); } diff --git a/core/src/main/java/org/lflang/federated/generator/FedPreambleEmitter.java b/core/src/main/java/org/lflang/federated/generator/FedPreambleEmitter.java index 4edf10767c..930f397d84 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedPreambleEmitter.java +++ b/core/src/main/java/org/lflang/federated/generator/FedPreambleEmitter.java @@ -3,6 +3,7 @@ import static org.lflang.ast.ASTUtils.toText; import java.io.IOException; +import java.util.List; import org.lflang.MessageReporter; import org.lflang.ast.ASTUtils; import org.lflang.federated.extensions.FedTargetExtensionFactory; @@ -26,6 +27,7 @@ public FedPreambleEmitter() {} */ String generatePreamble( FederateInstance federate, + List allFederates, FederationFileConfig fileConfig, RtiConfig rtiConfig, MessageReporter messageReporter) @@ -53,7 +55,8 @@ String generatePreamble( =}""" .formatted( FedTargetExtensionFactory.getExtension(federate.targetConfig.target) - .generatePreamble(federate, fileConfig, rtiConfig, messageReporter))); + .generatePreamble( + federate, allFederates, fileConfig, rtiConfig, messageReporter))); return preambleCode.getCode(); } diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c index 251b1380a9..fd5c1573f9 160000 --- a/core/src/main/resources/lib/c/reactor-c +++ b/core/src/main/resources/lib/c/reactor-c @@ -1 +1 @@ -Subproject commit 251b1380a9ebaa789448f3ade5012c20703c3855 +Subproject commit fd5c1573f9b8ca3672d9b7cbc6f1369f4be90828 diff --git a/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTimer.lf b/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTimer.lf new file mode 100644 index 0000000000..b015537813 --- /dev/null +++ b/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTimer.lf @@ -0,0 +1,159 @@ +/** + * This LF program tests if a transient federate correctly leaves then joins the federation. It also + * tests whether the transient's downstream executes as expected, that is it receives correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port + * `in`, + * - the downstream of the transient federate has only one transient as upstream. + */ +target C { + timeout: 3 s, + coordination: decentralized +} + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + lf_print("**** Launch command: %s", mid_launch_cmd); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * output port out. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + =} +} + +/** + * Transient federate that forwards whatever it receives from `Up` to `Down`. It reacts twice to + * input port `in`, then stops. It will execute twice during the lifetime of the federation. The + * second launch is done by `TransientExec` at logical time 1 s. Each time `Middle` joins, it + * notifies `Down`. + */ +reactor Middle(STP_offset: time = 100 ms) { + input in: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + tag_t t = lf_tag_start_effective(); + if(t.time < lf_time_start()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port and stop spontaneously after two reactions to in + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + + if (self->count == 2) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down(STP_offset: time = 100 ms) { + timer t(0, 500 ms) + + input in: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(in) {= + self->count_in_mid_reactions++; + =} + + reaction(shutdown) {= + // Check that the TAG has been successfully issued to Down + if (self->count_timer < 5) { + lf_print_error_and_exit("Down federate's timer reacted %d times, while it had to react more than %d times.", + self->count_timer, 5); + } + + // Check that `Middle` have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that `Middle` have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +} diff --git a/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTwoUpstream.lf b/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTwoUpstream.lf new file mode 100644 index 0000000000..1be6179192 --- /dev/null +++ b/test/C/src/federated/transient/DecentralizedTransientDownstreamWithTwoUpstream.lf @@ -0,0 +1,129 @@ +/** + * This LF program tests whether a transient federate corretly leaves then joins the federation. It + * also tests whether the transient's downstream executes as expected, that is it received correct + * TAGs, regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port in, + * - the downstream of the transient federate has one persistent and one transient upstreams. + * + * In addition, the program tests if authentication works in case of a federation with transients, + * by adding `auth` target property. + */ +target C { + timeout: 3 s, + auth: true, + coordination: decentralized +} + +import Up from "DecentralizedTransientDownstreamWithTimer.lf" +import Middle from "DecentralizedTransientDownstreamWithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down(STP_offset: time = 100 ms) { + timer t(0, 500 ms) + + input in_mid: int + input in_up: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + state count_in_up_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(in_mid) {= + self->count_in_mid_reactions++; + =} + + reaction(in_up) {= + self->count_in_up_reactions++; + =} + + reaction(shutdown) {= + // Check that the TAG have been successfully issued to Down + if (self->count_timer < 5) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react more than %d times.", + self->count_timer, + 5); + } + if (self->count_in_up_reactions < 7) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react more than %d times.", + self->count_in_up_reactions, + 7); + } + + // Check that Middle have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that Middle have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up1 = new Up() + up2 = new Up(period = 300 msec) + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up1.out -> mid.in + mid.join -> down.join + mid.out -> down.in_mid + up2.out -> down.in_up +} diff --git a/test/C/src/federated/transient/DecentralizedTransientHotSwap.lf b/test/C/src/federated/transient/DecentralizedTransientHotSwap.lf new file mode 100644 index 0000000000..54df3ab170 --- /dev/null +++ b/test/C/src/federated/transient/DecentralizedTransientHotSwap.lf @@ -0,0 +1,98 @@ +/** + * This LF program is a variant of TransientDownstreamWithTimer that tests the Hot Swap mechanism. + * For this, it tests whether the transient's downstream executes as expected and if `mid` is + * stopped and the second instance joins as expected. In this test: + * - the transient federate DOES NOT spontaneously leave the federation. + * - the downstream of the transient federate has only one transient as upstream. + * - A persistent federate `TransientExec` launches `mid` after 1s to activate the hot mechanism + * swap. + */ +target C { + timeout: 3 s, + auth: true, + coordination: decentralized +} + +import Up from "DecentralizedTransientDownstreamWithTimer.lf" +import Down from "DecentralizedTransientDownstreamWithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Transient federate that forwards whatever it receives from `Up` to `Down`. It reacts twice to + * input port `in`, then stops. It will execute twice during the lifetime of the federation. The + * second launch is done by `TransientExec` at logical time 1 s. Each time `Middle` joins, it + * notifies `Down`. + */ +reactor Middle(STP_offset: time = 100 ms) { + input in: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + tag_t t = lf_tag_start_effective(); + if(t.time < lf_time_start()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +} diff --git a/test/C/src/federated/transient/DecentralizedTransientStatePersistence.lf b/test/C/src/federated/transient/DecentralizedTransientStatePersistence.lf new file mode 100644 index 0000000000..0ab4e11dc0 --- /dev/null +++ b/test/C/src/federated/transient/DecentralizedTransientStatePersistence.lf @@ -0,0 +1,195 @@ +/** + * This LF program showcases and tests the persistance of the internal state of a transient federate + * across executions. Using the hot swap mechanism, the transient federate `Middle` leaves and then + * joins. Whenever the state (of type `federate_state_t`) changes, it notifies `Persistence`. + * `Middle` notifies `Persistence` also when it joins. When `Middle` joins the second time or after, + * it receives the saved state and sets it. In this, the order of the reactions is important. + */ +target C { + timeout: 2900 ms, + coordination: decentralized +} + +preamble {= + #include + #include + // The internal federate state to be persistent across executions + typedef struct federate_state_t { + char state_char; + int state_count; + } federate_state_t; +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate %s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +reactor Persistence(STP_offset: time = 100 ms) { + state middle_state: federate_state_t = {'A', 0} + state middle_first_join: bool = true + + input in_from_middle: federate_state_t + input in_middle_join: bool + output out_to_middle: federate_state_t + + // Only send the previous state if it not the first time Middle joins + reaction(in_middle_join) -> out_to_middle {= + if (!self->middle_first_join) { + lf_set(out_to_middle, self->middle_state); + lf_print("Notifying Mid of the latest state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + } + self->middle_first_join = false; + =} + + reaction(in_from_middle) {= + self->middle_state.state_char = in_from_middle->value.state_char; + self->middle_state.state_count = in_from_middle->value.state_count; + lf_print("Latest received state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * out output port. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + lf_print("Up timer sent %d", self->count); + =} +} + +/** + * Transient federate that forwards whatever it receives from Up to down. It reacts twice to in + * input ports, then stops. It will execute twice during the lifetime of the federation. The second + * launch is done by TransientExec at logical time 1 s. Each time Middle joins, it notifies Down. + */ +reactor Middle(STP_offset: time = 100 ms) { + input in: int + output out: int + output join: bool + state middle_state: federate_state_t = {'A', 0} + + output out_to_persistence: federate_state_t // State Persistence + input in_from_persistence: federate_state_t + + // Middle notifies its downstream that he joined + reaction(startup) -> join {= + lf_set(join, true); + =} + + reaction(in_from_persistence) {= + self->middle_state = in_from_persistence->value; + lf_print("Received the latest state of: {%c,%d} at " PRINTF_TIME ".", + self->middle_state.state_char, + self->middle_state.state_count, + lf_time_logical_elapsed()); + =} + + // When an input is received, the internal state is updated, and then sent to + // Persistance. + reaction(in) -> out, out_to_persistence {= + self->middle_state.state_char++; + self->middle_state.state_count += 2; + lf_set(out, self->middle_state.state_count); + lf_set(out_to_persistence, self->middle_state); + lf_print("Mid state is: {count='%c', count=%d}", + self->middle_state.state_char, + self->middle_state.state_count); + + if (self->middle_state.state_count == 4) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down(STP_offset: time = 100 ms) { + timer t(0, 500 ms) + + input in: int + input join: bool + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + lf_print("Down timer count %d", self->count_timer); + =} + + reaction(join) {= + self->count_join++; + lf_print("Down count join %d", self->count_join); + =} + + reaction(in) {= + self->count_in_mid_reactions++; + lf_print("Down in %d", self->count_in_mid_reactions); + =} + + reaction(shutdown) {= + if(self->count_join == 2 && self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Mid Joined twice, but the state did not persist \ + across executions! state_count is %d, while is should be > then %d.", + self->count_in_mid_reactions, + 4); + } + =} +} + +federated reactor { + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + persistence = new Persistence() + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.join -> persistence.in_middle_join + mid.out -> down.in + persistence.out_to_middle -> mid.in_from_persistence + mid.out_to_persistence -> persistence.in_from_middle +} diff --git a/test/C/src/federated/transient/DecentralizedTransietWithPhysicalConnection.lf b/test/C/src/federated/transient/DecentralizedTransietWithPhysicalConnection.lf new file mode 100644 index 0000000000..74e34a5f85 --- /dev/null +++ b/test/C/src/federated/transient/DecentralizedTransietWithPhysicalConnection.lf @@ -0,0 +1,168 @@ +/** + * This LF program tests if a transient federate correctly leaves then joins the federation. It also + * tests whether the transient's downstream executes as expected, that is it receives correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port + * `in`, + * - the downstream of the transient federate has only one transient as upstream. + */ +target C { + timeout: 3 s, + coordination: decentralized +} + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + lf_print("**** Launch command: %s", mid_launch_cmd); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * output port out. + */ +reactor Up(period: time = 500 ms) { + output out: int + output outp: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out, outp {= + lf_set(out, self->count); + lf_set(outp, self->count); + self->count++; + =} +} + +/** + * Transient federate that forwards whatever it receives from `Up` to `Down`. It reacts twice to + * input port `in`, then stops. It will execute twice during the lifetime of the federation. The + * second launch is done by `TransientExec` at logical time 1 s. Each time `Middle` joins, it + * notifies `Down`. + */ +reactor Middle(STP_offset: time = 100 ms) { + input in: int + input inp: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + tag_t t = lf_tag_start_effective(); + if(t.time < lf_time_start()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port and stop spontaneously after two reactions to in + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + + if (self->count == 2) { + lf_stop(); + } + =} + + reaction(inp) {= + lf_print("Transient federate received value %d on port inp at logical time " PRINTF_TAG ".", + inp->value, lf_tag().time, lf_tag().microstep); + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down(STP_offset: time = 100 ms) { + timer t(0, 500 ms) + + input in: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(in) {= + self->count_in_mid_reactions++; + =} + + reaction(shutdown) {= + // Check that the TAG has been successfully issued to Down + if (self->count_timer < 5) { + lf_print_error_and_exit("Down federate's timer reacted %d times, while it had to react more than %d times.", + self->count_timer, 5); + } + + // Check that `Middle` have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that `Middle` have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in + up.outp ~> mid.inp +} diff --git a/test/C/src/federated/transient/TransientStatePersistenceWithPhysicalConnection.lf b/test/C/src/federated/transient/TransientStatePersistenceWithPhysicalConnection.lf new file mode 100644 index 0000000000..a413cfa005 --- /dev/null +++ b/test/C/src/federated/transient/TransientStatePersistenceWithPhysicalConnection.lf @@ -0,0 +1,198 @@ +/** + * This LF program showcases and tests the persistance of the internal state of a transient federate + * across executions. Using the hot swap mechanism, the transient federate `Middle` leaves and then + * joins. Whenever the state (of type `federate_state_t`) changes, it notifies `Persistence`. + * `Middle` notifies `Persistence` also when it joins. When `Middle` joins the second time or after, + * it receives the saved state and sets it. In this, the order of the reactions is important. + */ +target C { + timeout: 2900 ms +} + +preamble {= + #include + #include + // The internal federate state to be persistent across executions + typedef struct federate_state_t { + char state_char; + int state_count; + } federate_state_t; +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(launch_time: time = 0, fed_instance_name: char* = "instance") { + timer t(launch_time, 0) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/bin/federate__%s -i %s", + LF_FED_PACKAGE_DIRECTORY, + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate %s at physical time " PRINTF_TIME ".", + self->fed_instance_name, lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +reactor Persistence { + state middle_state: federate_state_t = {'A', 0} + state middle_first_join: bool = true + + input in_from_middle: federate_state_t + input in_middle_join: bool + output out_to_middle: federate_state_t + + // Only send the previous state if it not the first time Middle joins + reaction(in_middle_join) -> out_to_middle {= + if (!self->middle_first_join) { + lf_set(out_to_middle, self->middle_state); + lf_print("Notifying Mid of the latest state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + } + self->middle_first_join = false; + =} + + reaction(in_from_middle) {= + self->middle_state.state_char = in_from_middle->value.state_char; + self->middle_state.state_count = in_from_middle->value.state_count; + lf_print("Latest received state: {%c,%d}", self->middle_state.state_char, + self->middle_state.state_count); + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * out output port. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + lf_print("Up timer sent %d", self->count); + =} +} + +/** + * Transient federate that forwards whatever it receives from Up to down. It reacts twice to in + * input ports, then stops. It will execute twice during the lifetime of the federation. The second + * launch is done by TransientExec at logical time 1 s. Each time Middle joins, it notifies Down. + */ +reactor Middle { + input in: int + output out: int + output join: bool + output outp: int + state middle_state: federate_state_t = {'A', 0} + + output out_to_persistence: federate_state_t // State Persistence + input in_from_persistence: federate_state_t + + // Middle notifies its downstream that he joined + reaction(startup) -> join {= + lf_set(join, true); + =} + + reaction(in_from_persistence) {= + self->middle_state = in_from_persistence->value; + lf_print("Received the latest state of: {%c,%d} at " PRINTF_TIME ".", + self->middle_state.state_char, + self->middle_state.state_count, + lf_time_logical_elapsed()); + =} + + // When an input is received, the internal state is updated, and then sent to + // Persistance. + reaction(in) -> out, out_to_persistence, outp {= + self->middle_state.state_char++; + self->middle_state.state_count += 2; + lf_set(out, self->middle_state.state_count); + lf_set(out_to_persistence, self->middle_state); + lf_set(outp, self->middle_state.state_count); + lf_print("Mid state is: {count='%c', count=%d}", + self->middle_state.state_char, + self->middle_state.state_count); + + if (self->middle_state.state_count == 4) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in: int + input inp: int + input join: bool + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + lf_print("Down timer count %d", self->count_timer); + =} + + reaction(join) {= + self->count_join++; + lf_print("Down count join %d", self->count_join); + =} + + reaction(in) {= + self->count_in_mid_reactions++; + lf_print("Down in %d", self->count_in_mid_reactions); + =} + + reaction(shutdown) {= + if(self->count_join == 2 && self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Mid Joined twice, but the state did not persist \ + across executions! state_count is %d, while is should be > then %d.", + self->count_in_mid_reactions, + 4); + } + =} +} + +federated reactor { + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + persistence = new Persistence() + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(launch_time = 1 s, fed_instance_name="mid") + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.join -> persistence.in_middle_join + mid.out -> down.in + mid.outp -> down.inp + persistence.out_to_middle -> mid.in_from_persistence + mid.out_to_persistence -> persistence.in_from_middle +}