diff --git a/testcontainers/pom.xml b/testcontainers/pom.xml index 9669f10c..ef86938b 100644 --- a/testcontainers/pom.xml +++ b/testcontainers/pom.xml @@ -18,7 +18,7 @@ ${project.parent.basedir}/LICENSE_HEADER.txt 17 - 0.6.1 + 5.1.4 @@ -104,13 +104,6 @@ - - - kr.motd.maven - os-maven-plugin - 1.7.0 - - org.jsonschema2pojo @@ -146,25 +139,45 @@ - org.xolstice.maven.plugins + io.github.ascopes protobuf-maven-plugin ${protobuf-plugin.version} - ${project.basedir}/src/test/proto - - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} - - grpc-java - - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - + + ${protobuf.version} + + + + io.grpc + protoc-gen-grpc-java + ${grpc.version} + + + generate-test-tqe3 - test-compile - test-compile-custom + generate-test + + + ${project.basedir}/src/test/proto/tqe3 + + ${project.build.directory}/generated-test-sources/protobuf/tqe3 + + + + generate-test-tqe2 + + generate-test + + + + ${project.basedir}/src/test/proto/tqe2 + + ${project.build.directory}/generated-test-sources/protobuf/tqe2 + diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/AbstractTQECluster.java similarity index 51% rename from testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java rename to testcontainers/src/main/java/org/testcontainers/containers/tqe/AbstractTQECluster.java index bcc85aa9..5015eaaf 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/AbstractTQECluster.java @@ -10,6 +10,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,20 +20,32 @@ import org.testcontainers.containers.tqe.configuration.TQEConfigurator; import org.testcontainers.lifecycle.Startable; -public class TQEClusterImpl implements TQECluster { +/** + * Abstract base class for TQE cluster implementations that provides common functionality for + * different TQE versions (2.x, 3.x, etc.). + */ +public abstract class AbstractTQECluster implements TQECluster { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTQECluster.class); - private static final Logger LOGGER = LoggerFactory.getLogger(TQEClusterImpl.class); + private static final ExecutorService STARTUP_EXECUTOR = + Executors.newCachedThreadPool( + r -> { + var t = new Thread(r, "tqe-cluster-startup"); + t.setDaemon(true); + return t; + }); - private final TQEConfigurator configurator; + protected final TQEConfigurator configurator; - private boolean isClosed; + private volatile boolean isClosed; - public TQEClusterImpl(TQEConfigurator configurator) { + public AbstractTQECluster(TQEConfigurator configurator) { this.configurator = configurator; } @Override - public synchronized void start() { + public final synchronized void start() { if (this.isClosed) { throw new IllegalStateException("Container is already closed. Please create new container"); } @@ -47,18 +61,43 @@ public synchronized void start() { @Override public synchronized void stop() { - if (this.configurator != null) { - try { - this.configurator.close(); - this.isClosed = true; - } catch (Exception e) { - throw new RuntimeException(e); - } + try { + this.configurator.close(); + this.isClosed = true; + } catch (Exception e) { + throw new RuntimeException(e); } } - private static void startParallel( + /** + * Starts Tarantool cluster nodes and configures the cluster. Default implementation starts all + * queue containers in parallel and then configures the cluster. Subclasses can override to change + * startup order. + */ + protected void startTarantoolCluster() { + startParallel(this.configurator.queue(), this.configurator); + if (!this.configurator.isConfigured()) { + this.configurator.configure(); + } + } + + /** + * Starts gRPC endpoint containers. Default implementation starts all gRPC containers in parallel. + * Subclasses can override to change startup order. + */ + protected void startGrpcEndpoints() { + startParallel(this.configurator.grpc(), this.configurator); + } + + /** + * Common utility method to start containers in parallel. + * + * @param containers map of container names to container instances + * @param configurator the TQE configurator + */ + protected static void startParallel( Map containers, TQEConfigurator configurator) { + final List> futures = new ArrayList<>(containers.size()); final CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); @@ -70,12 +109,18 @@ private static void startParallel( try { c.start(); } catch (Exception e) { - LOGGER.error("Error starting TQE container [container_name='{}']", name, e); + LOGGER.error( + "Error starting TQE container [cluster='{}', container_name='{}']: {}", + configurator.clusterName(), + name, + e.getMessage(), + e); errors.add(e); } - }))); + }, + STARTUP_EXECUTOR))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})).join(); + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); if (!errors.isEmpty()) { throw new ContainerLaunchException( @@ -85,24 +130,6 @@ private static void startParallel( } } - private void startTarantoolCluster() { - startParallel(this.configurator.queue(), this.configurator); - if (this.configurator.isConfigured()) { - LOGGER.warn( - "TQE cluster [name = {}, queue = {}, grpc = {}] already configured", - this.configurator.clusterName(), - this.configurator.queue().size(), - this.configurator.grpc().size()); - return; - } - - this.configurator.configure(); - } - - private void startGrpcEndpoints() { - startParallel(this.configurator.grpc(), this.configurator); - } - @Override public String clusterName() { return this.configurator.clusterName(); diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java index 74883e51..ba66b3bd 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java @@ -50,7 +50,9 @@ public interface GrpcContainer> enum GrpcRole { CONSUMER("consumer"), - PRODUCER("producer"); + PRODUCER("producer"), + + PUBLISHER("publisher"); private final String role; diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java index aba179a4..b20ce2a4 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java @@ -163,7 +163,7 @@ private static void validateConfigPath(Path configPath) { if (configPath == null || !Files.exists(configPath) || !Files.isRegularFile(configPath) - || configPath.endsWith(".yml")) { + || !configPath.toString().endsWith(".yml")) { LOGGER.error( "Invalid config file. Config path is null or not exists or not regular or not having" + " '.yml' extension: {}", @@ -227,7 +227,11 @@ private static Set resolveRoles(GrpcConfiguration config, Path configP final Set roles = new LinkedHashSet<>(); if (isPublisher.isPresent() && isPublisher.get()) { roles.add(GrpcRole.PRODUCER); - LOGGER.trace("Publisher role is enabled for: {}", configPath); + roles.add(GrpcRole.PUBLISHER); + LOGGER.trace( + "Publisher/Producer role is enabled for: {} (both PRODUCER and PUBLISHER for TQE2/TQE3" + + " compatibility)", + configPath); } final Optional isConsumer = config.getConsumer().flatMap(ConsumerConfig::getEnabled); diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE2ClusterImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE2ClusterImpl.java new file mode 100644 index 00000000..14b1788d --- /dev/null +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE2ClusterImpl.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.tqe; + +import org.testcontainers.containers.tqe.configuration.TQEConfigurator; + +/** + * Implementation of TQE cluster interface specifically designed for TQE 2.x compatibility. Uses the + * default startup order: queue → configure → grpc. + */ +public class TQE2ClusterImpl extends AbstractTQECluster { + + public TQE2ClusterImpl(TQEConfigurator configurator) { + super(configurator); + } +} diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE3ClusterImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE3ClusterImpl.java new file mode 100644 index 00000000..ee77c256 --- /dev/null +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQE3ClusterImpl.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.tqe; + +import org.testcontainers.containers.tqe.configuration.TQEConfigurator; + +/** + * Implementation of TQE cluster interface specifically designed for TQE 3.x compatibility. Unlike + * TQE 2.x, TQE 3.x configures the cluster between starting queue and gRPC containers: queue → + * configure → grpc. + */ +public class TQE3ClusterImpl extends AbstractTQECluster { + + public TQE3ClusterImpl(TQEConfigurator configurator) { + super(configurator); + } + + @Override + protected void startTarantoolCluster() { + startParallel(this.configurator.queue(), this.configurator); + } + + @Override + protected void startGrpcEndpoints() { + if (!this.configurator.isConfigured()) { + this.configurator.configure(); + } + startParallel(this.configurator.grpc(), this.configurator); + } +} diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java index 49defe6e..d6dcd27c 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java @@ -47,8 +47,8 @@ import io.tarantool.autogen.iproto.listen.Listen; /** - * Base implementation of {@link TQEConfigurator} that configure TQE cluster using configuration - * files. All other implementations should extend this class. + * Configures TQE cluster using configuration files. Supports both TQE 2.x and TQE 3.x via factory + * methods {@link #tqe2Builder} and {@link #tqe3Builder}. */ public class FileTQEConfigurator implements TQEConfigurator { @@ -64,11 +64,12 @@ public class FileTQEConfigurator implements TQEConfigurator { /* Constants /********************************************************** */ + private static final String TQE2_ROUTER_ROLE = "app.roles.api"; + private static final String TQE3_ROUTER_ROLE = "roles.tqe-router"; + private static final String CONFIGURATOR_ERROR_MSG = "An error occurred when configuring the TQE cluster. See logs for details."; - private static final String TQE_ROUTER_ROLE = "roles.tqe-router"; - /* /********************************************************** /* Parameter extractors @@ -97,6 +98,8 @@ public class FileTQEConfigurator implements TQEConfigurator { private final Network network; + private final String routerRole; + private boolean configured; private FileTQEConfigurator( @@ -105,11 +108,13 @@ private FileTQEConfigurator( Collection grpcConfigs, String clusterName, Duration startupTimeout, - Duration bootstrapTimeout) { + Duration bootstrapTimeout, + String routerRole) { this.queueConfig = queueConfig; this.grpcConfigs = grpcConfigs; this.clusterName = clusterName; this.bootstrapTimeout = bootstrapTimeout; + this.routerRole = routerRole; this.routerNames = new LinkedHashSet<>(1); this.image = image; this.startupTimeout = startupTimeout; @@ -156,9 +161,9 @@ private Map> initQueue( final Map> nodes = new LinkedHashMap<>(instances.size()); this.routerNames.addAll( - ConfigurationUtils.findInstancesWithRole(this.queueParsedConfig, TQE_ROUTER_ROLE)); + ConfigurationUtils.findInstancesWithRole(this.queueParsedConfig, this.routerRole)); if (this.routerNames.isEmpty()) { - LOGGER.error("At least one container must have the 'router' and '{}' roles", TQE_ROUTER_ROLE); + LOGGER.error("At least one container must have the 'router' and '{}' roles", this.routerRole); throw new ContainerLaunchException(CONFIGURATOR_ERROR_MSG); } @@ -429,29 +434,58 @@ public synchronized boolean isConfigured() { @Override public synchronized void close() { - queue().values().parallelStream().forEach(Startable::close); - grpc().values().parallelStream().forEach(Startable::close); - if (this.network != null) { - this.network.close(); - } + this.queue.values().parallelStream().forEach(Startable::close); + this.grpc.values().parallelStream().forEach(Startable::close); + this.network.close(); } - public static Builder builder( + /** + * Creates a builder pre-configured for TQE 2.x (router role: {@value #TQE2_ROUTER_ROLE}). + * + * @param image Docker image name + * @param queueConfig path to queue configuration file + * @param grpcConfigs paths to gRPC configuration files + * @return builder with TQE 2.x router role pre-set + */ + public static Builder tqe2Builder( + DockerImageName image, Path queueConfig, Collection grpcConfigs) { + return builder(image, queueConfig, grpcConfigs).withRouterRole(TQE2_ROUTER_ROLE); + } + + /** + * Creates a builder pre-configured for TQE 3.x (router role: {@value #TQE3_ROUTER_ROLE}). + * + * @param image Docker image name + * @param queueConfig path to queue configuration file + * @param grpcConfigs paths to gRPC configuration files + * @return builder with TQE 3.x router role pre-set + */ + public static Builder tqe3Builder( + DockerImageName image, Path queueConfig, Collection grpcConfigs) { + return builder(image, queueConfig, grpcConfigs).withRouterRole(TQE3_ROUTER_ROLE); + } + + private static Builder builder( DockerImageName image, Path queueConfig, Collection grpcConfigs) { return new Builder(image, queueConfig, grpcConfigs); } + /** + * Builder for {@link FileTQEConfigurator}. Use factory methods {@link #tqe2Builder} or {@link + * #tqe3Builder} to obtain a pre-configured builder. + */ public static class Builder { private static final String DEFAULT_CLUSTER_NAME_PREFIX = "tqe-test"; + private final DockerImageName image; private final Path queueConfig; private final Collection grpcConfigs; - private final DockerImageName image; private String clusterName; private Duration startupTimeout; private Duration bootstrapTimeout; + private String routerRole; - public Builder(DockerImageName image, Path queueConfig, Collection grpcConfigs) { + private Builder(DockerImageName image, Path queueConfig, Collection grpcConfigs) { if (queueConfig == null || !Files.isRegularFile(queueConfig)) { LOGGER.error("Queue config file is invalid (null or not regular): {})", queueConfig); throw new IllegalArgumentException(CONFIGURATOR_ERROR_MSG); @@ -474,18 +508,13 @@ public Builder(DockerImageName image, Path queueConfig, Collection grpcCon this.grpcConfigs = new LinkedHashSet<>(grpcConfigs); } + public Builder withRouterRole(String routerRole) { + this.routerRole = Objects.requireNonNull(routerRole, "routerRole must not be null"); + return this; + } + public Builder withClusterName(String clusterName) { - final String defaultClusterName; - if (clusterName == null || clusterName.trim().isEmpty()) { - defaultClusterName = DEFAULT_CLUSTER_NAME_PREFIX + "-" + UUID.randomUUID(); - LOGGER.warn( - "Cluster name is invalid (null or blank): {}. Set default cluster name: '{}'", - clusterName, - defaultClusterName); - } else { - defaultClusterName = clusterName; - } - this.clusterName = defaultClusterName; + this.clusterName = clusterName; return this; } @@ -508,13 +537,19 @@ public FileTQEConfigurator build() { this.startupTimeout == null ? DEFAULT_STARTUP_TIMEOUT : this.startupTimeout; final Duration bootstrapTimeout = this.bootstrapTimeout == null ? DEFAULT_BOOTSTRAP_TIMEOUT : this.bootstrapTimeout; + if (this.routerRole == null) { + throw new IllegalStateException( + "routerRole must be set. Use tqe2Builder() or tqe3Builder() factory method, " + + "or call withRouterRole() explicitly."); + } return new FileTQEConfigurator( this.image, this.queueConfig, this.grpcConfigs, clusterName, startupTimeout, - bootstrapTimeout); + bootstrapTimeout, + this.routerRole); } } } diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java index 0594fd3d..4604e40f 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java @@ -8,6 +8,7 @@ import java.util.Optional; import java.util.Set; +import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -73,6 +74,7 @@ public class GrpcConfiguration { private final Boolean daemon; @JsonProperty("producer") + @JsonAlias("publisher") private final ProducerConfig producer; @JsonProperty("consumer") diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java deleted file mode 100644 index 134d35e6..00000000 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY - * All Rights Reserved. - */ - -package org.testcontainers.containers.integration.tqe; - -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Objects; - -import org.junit.jupiter.api.io.TempDir; -import org.testcontainers.utility.DockerImageName; - -public abstract class CommonTest { - - @TempDir protected static Path TEST_TEMP_DIR; - - protected static final DockerImageName IMAGE_NAME = - DockerImageName.parse( - System.getenv().getOrDefault("TARANTOOL_REGISTRY", "") - + "tarantool/message-queue-ee:v3.5.0"); - - protected static final Path SIMPLE_GRPC_CONFIG; - protected static final Path SIMPLE_QUEUE_CONFIG; - - static { - try { - SIMPLE_GRPC_CONFIG = - Paths.get( - Objects.requireNonNull( - CommonTest.class - .getClassLoader() - .getResource("tqe/simple-config/simple-grpc.yml")) - .toURI()); - SIMPLE_QUEUE_CONFIG = - Paths.get( - Objects.requireNonNull( - CommonTest.class - .getClassLoader() - .getResource("tqe/simple-config/simple-queue.yml")) - .toURI()); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } -} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java index 25fb4c07..1cbaab55 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java @@ -13,31 +13,31 @@ import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.containers.ContainerLaunchException; import org.testcontainers.containers.tqe.configuration.FileTQEConfigurator; import org.testcontainers.containers.tqe.configuration.TQEConfigurator; import org.testcontainers.lifecycle.Startable; -class FileTQEConfiguratorTest extends CommonTest { +class FileTQEConfiguratorTest { - @Test - void simpleConfiguration() { + @ParameterizedTest + @EnumSource(TQEVersion.class) + void simpleConfiguration(TQEVersion version) throws Exception { try (TQEConfigurator configurator = - FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG)) - .build()) { + version.configuratorBuilder(version.queueConfig(), Set.of(version.grpcConfig())).build()) { configurator.queue().values().parallelStream().forEach(Startable::start); - configurator.configure(); + if (version.requiresConfigure()) { + configurator.configure(); + } configurator.grpc().values().parallelStream().forEach(Startable::start); - } catch (Exception e) { - throw new RuntimeException(e); } } - public static Stream dataForTestInvalidQueueConfigShouldThrow() { + public static Stream dataForTestInvalidQueueConfig() { return Stream.of( // router have no required roles """ @@ -169,44 +169,53 @@ public static Stream dataForTestInvalidQueueConfigShouldThrow() { } @ParameterizedTest - @MethodSource("dataForTestInvalidQueueConfigShouldThrow") + @MethodSource("dataForTestInvalidQueueConfig") void testInvalidQueueConfig(String invalidQueueConfig) throws IOException { - final Path invalidConfigPath = TEST_TEMP_DIR.resolve(UUID.randomUUID().toString()); + final Path invalidConfigPath = + TQETestHelper.TEST_TEMP_DIR.resolve(UUID.randomUUID().toString()); Files.writeString(invalidConfigPath, invalidQueueConfig); - Assertions.assertThrows( - ContainerLaunchException.class, - () -> { - try (FileTQEConfigurator c = - FileTQEConfigurator.builder(IMAGE_NAME, invalidConfigPath, Set.of(SIMPLE_GRPC_CONFIG)) - .build()) {} - }); + for (TQEVersion version : TQEVersion.values()) { + Assertions.assertThrows( + ContainerLaunchException.class, + () -> { + try (FileTQEConfigurator c = + version + .configuratorBuilder(invalidConfigPath, Set.of(version.grpcConfig())) + .build()) {} + }); + } } public static Stream dataForTestInvalidConfigsPaths() { - return Stream.of( - // invalid grpc configs - // null - Arguments.of(SIMPLE_QUEUE_CONFIG, null), - // empty - Arguments.of(SIMPLE_QUEUE_CONFIG, Set.of()), - // non regular - Arguments.of(SIMPLE_QUEUE_CONFIG, Set.of(TEST_TEMP_DIR)), - - // invalid queue config - Arguments.of(null, Set.of(SIMPLE_GRPC_CONFIG)), - Arguments.of(TEST_TEMP_DIR, Set.of(SIMPLE_GRPC_CONFIG))); + return TQEVersion.all() + .flatMap( + version -> { + Path qc = version.queueConfig(); + Path gc = version.grpcConfig(); + return Stream.of( + // invalid grpc configs + // null + Arguments.of(version, qc, null), + // empty + Arguments.of(version, qc, Set.of()), + // non regular + Arguments.of(version, qc, Set.of(TQETestHelper.TEST_TEMP_DIR)), + // invalid queue config + Arguments.of(version, (Path) null, Set.of(gc)), + Arguments.of(version, TQETestHelper.TEST_TEMP_DIR, Set.of(gc))); + }); } @ParameterizedTest @MethodSource("dataForTestInvalidConfigsPaths") - void testInvalidConfigsPaths(Path invalidGrpcConfig, Set invalidQueueConfigs) { + void testInvalidConfigsPaths( + TQEVersion version, Path invalidGrpcConfig, Set invalidQueueConfigs) { Assertions.assertThrows( IllegalArgumentException.class, () -> { try (FileTQEConfigurator c = - FileTQEConfigurator.builder(IMAGE_NAME, invalidGrpcConfig, invalidQueueConfigs) - .build()) {} + version.configuratorBuilder(invalidGrpcConfig, invalidQueueConfigs).build()) {} }); } } diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/GrpcTestStrategy.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/GrpcTestStrategy.java new file mode 100644 index 00000000..60585cc7 --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/GrpcTestStrategy.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.util.List; +import java.util.Set; + +import io.grpc.ManagedChannel; +import org.testcontainers.containers.utils.pojo.User; + +/** + * Encapsulates version-specific gRPC API for publishing and subscribing. TQE 2.x uses {@code + * PublisherServiceGrpc} + unidirectional streaming. TQE 3.x uses {@code ProducerGrpc} + + * bidirectional streaming. + */ +interface GrpcTestStrategy { + + /** + * Publishes a batch of users to the given queue over the channel. Synchronous: throws on failure. + */ + void publish(ManagedChannel channel, List users, String queue) throws Exception; + + /** + * Starts a subscription on the given queue. Messages are delivered asynchronously and added to + * {@code result} as they arrive. This call only kicks off the subscription; callers should + * retry/poll until {@code result} reaches the expected size. + */ + void subscribe(ManagedChannel channel, String queue, Set result); +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE2GrpcTestStrategy.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE2GrpcTestStrategy.java new file mode 100644 index 00000000..d6a04daa --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE2GrpcTestStrategy.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import org.testcontainers.containers.utils.pojo.User; +import tarantool.queue_ee.v2.Consumer.SubscriptionNotifications; +import tarantool.queue_ee.v2.Consumer.SubscriptionRequest; +import tarantool.queue_ee.v2.ConsumerServiceGrpc; +import tarantool.queue_ee.v2.ConsumerServiceGrpc.ConsumerServiceStub; +import tarantool.queue_ee.v2.Publisher.BatchRequestMessage; +import tarantool.queue_ee.v2.Publisher.PublishBatchRequest; +import tarantool.queue_ee.v2.PublisherServiceGrpc; +import tarantool.queue_ee.v2.PublisherServiceGrpc.PublisherServiceBlockingStub; + +/** TQE 2.x gRPC API: {@code publishBatch} + unidirectional server-streaming subscribe. */ +final class TQE2GrpcTestStrategy implements GrpcTestStrategy { + + static final TQE2GrpcTestStrategy INSTANCE = new TQE2GrpcTestStrategy(); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public void publish(ManagedChannel channel, List users, String queue) throws Exception { + PublisherServiceBlockingStub pService = PublisherServiceGrpc.newBlockingStub(channel); + PublishBatchRequest.Builder requestBuilder = PublishBatchRequest.newBuilder(); + for (User user : users) { + requestBuilder.addMessages( + BatchRequestMessage.newBuilder() + .setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user)))); + } + pService.publishBatch(requestBuilder.setQueue(queue).build()); + } + + @Override + public void subscribe(ManagedChannel channel, String queue, Set result) { + ConsumerServiceStub cService = ConsumerServiceGrpc.newStub(channel); + cService.subscribe( + SubscriptionRequest.newBuilder().setCursor("").setQueue(queue).build(), + new StreamObserver<>() { + @Override + public void onNext(SubscriptionNotifications value) { + value.getNotificationsList().stream() + .map( + n -> { + try { + return MAPPER.readValue( + n.getMessage().getPayload().toByteArray(), User.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach(result::add); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException("Stream observer received error", t); + } + + @Override + public void onCompleted() {} + }); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE3GrpcTestStrategy.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE3GrpcTestStrategy.java new file mode 100644 index 00000000..02e0a8c2 --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQE3GrpcTestStrategy.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import org.testcontainers.containers.utils.pojo.User; +import tarantool.queue_ee.Consumer.SubscriptionRequest; +import tarantool.queue_ee.Consumer.SubscriptionStreamRequest; +import tarantool.queue_ee.Consumer.SubscriptionStreamResponse; +import tarantool.queue_ee.ConsumerServiceGrpc; +import tarantool.queue_ee.ConsumerServiceGrpc.ConsumerServiceStub; +import tarantool.queue_ee.ProducerGrpc; +import tarantool.queue_ee.ProducerGrpc.ProducerBlockingStub; +import tarantool.queue_ee.ProducerOuterClass.ProduceMessage; +import tarantool.queue_ee.ProducerOuterClass.ProduceRequest; + +/** TQE 3.x gRPC API: {@code produce} + bidirectional client-streaming subscribe. */ +final class TQE3GrpcTestStrategy implements GrpcTestStrategy { + + static final TQE3GrpcTestStrategy INSTANCE = new TQE3GrpcTestStrategy(); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public void publish(ManagedChannel channel, List users, String queue) throws Exception { + ProducerBlockingStub producer = ProducerGrpc.newBlockingStub(channel); + ProduceRequest.Builder requestBuilder = ProduceRequest.newBuilder().setQueue(queue); + for (User user : users) { + requestBuilder.addMessages( + ProduceMessage.newBuilder() + .setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user)))); + } + producer.produce(requestBuilder.build()); + } + + @Override + public void subscribe(ManagedChannel channel, String queue, Set result) { + ConsumerServiceStub consumer = ConsumerServiceGrpc.newStub(channel); + StreamObserver requestsStream = + consumer.subscribe( + new StreamObserver<>() { + @Override + public void onNext(SubscriptionStreamResponse response) { + response.getNotifications().getNotificationsList().stream() + .map( + n -> { + try { + return MAPPER.readValue( + n.getMessage().getPayload().toByteArray(), User.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach(result::add); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException("Stream observer received error", t); + } + + @Override + public void onCompleted() {} + }); + requestsStream.onNext( + SubscriptionStreamRequest.newBuilder() + .setSubscribeRequest(SubscriptionRequest.newBuilder().setCursor("").setQueue(queue)) + .build()); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java new file mode 100644 index 00000000..3334d3fd --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import org.testcontainers.containers.tqe.GrpcContainer; +import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole; +import org.testcontainers.containers.tqe.TQECluster; +import org.testcontainers.containers.tqe.configuration.TQEConfigurator; + +/** + * Encapsulates the lifecycle of a {@link TQECluster} for a single test: builds the configurator, + * creates the cluster, starts it, and exposes helpers for resolving gRPC channels by role. {@link + * #close()} stops the cluster. + */ +final class TQEClusterFixture implements AutoCloseable { + + private final TQEVersion version; + private final TQEConfigurator configurator; + private final TQECluster cluster; + + TQEClusterFixture(TQEVersion version) { + this.version = version; + this.configurator = + version.configuratorBuilder(version.queueConfig(), Set.of(version.grpcConfig())).build(); + this.cluster = version.createCluster(configurator); + this.cluster.start(); + } + + TQEVersion version() { + return version; + } + + ManagedChannel createPublisherChannel() { + return createReadyChannel(findByRole(version.producerRole())); + } + + ManagedChannel createConsumerChannel() { + return createReadyChannel(findByRole(GrpcRole.CONSUMER)); + } + + void restart(long delayBefore, TimeUnit unitBefore, long delayAfter, TimeUnit unitAfter) + throws InterruptedException { + this.cluster.restart(delayBefore, unitBefore, delayAfter, unitAfter); + } + + @Override + public void close() { + this.cluster.stop(); + } + + private GrpcContainer findByRole(GrpcRole role) { + return this.cluster.grpc().values().stream() + .filter(g -> g.roles().contains(role)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No gRPC container with role " + + role + + " in cluster " + + cluster.clusterName())); + } + + private static ManagedChannel createReadyChannel(GrpcContainer grpc) { + InetSocketAddress address = + grpc.grpcAddresses().stream() + .findFirst() + .orElseThrow( + () -> new IllegalStateException("No gRPC address on container " + grpc.node())); + return TQETestHelper.createReadyChannel(address); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java deleted file mode 100644 index 7633380c..00000000 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY - * All Rights Reserved. - */ - -package org.testcontainers.containers.integration.tqe; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; -import org.instancio.Instancio; -import org.instancio.Select; -import org.instancio.generators.Generators; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.rnorth.ducttape.unreliables.Unreliables; -import org.testcontainers.containers.ContainerLaunchException; -import org.testcontainers.containers.tqe.GrpcContainer; -import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole; -import org.testcontainers.containers.tqe.TQECluster; -import org.testcontainers.containers.tqe.TQEClusterImpl; -import org.testcontainers.containers.tqe.configuration.FileTQEConfigurator; -import org.testcontainers.containers.tqe.configuration.TQEConfigurator; -import org.testcontainers.containers.utils.pojo.User; -import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; -import tarantool.queue_ee.Consumer.SubscriptionRequest; -import tarantool.queue_ee.Consumer.SubscriptionStreamRequest; -import tarantool.queue_ee.Consumer.SubscriptionStreamResponse; -import tarantool.queue_ee.ConsumerServiceGrpc; -import tarantool.queue_ee.ConsumerServiceGrpc.ConsumerServiceStub; -import tarantool.queue_ee.ProducerGrpc; -import tarantool.queue_ee.ProducerGrpc.ProducerBlockingStub; -import tarantool.queue_ee.ProducerOuterClass.ProduceMessage; -import tarantool.queue_ee.ProducerOuterClass.ProduceRequest; - -class TQEClusterImplTest extends CommonTest { - - @RepeatedTest(10) - void testMultiplyRestart() throws Exception { - try (TQEConfigurator configurator = - FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG)) - .build(); - TQECluster cluster = new TQEClusterImpl(configurator)) { - cluster.start(); - } - } - - @Test - void testRestartMethod() throws Exception { - try (TQEConfigurator configurator = - FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG)) - .build(); - TQECluster cluster = new TQEClusterImpl(configurator)) { - cluster.start(); - cluster.restart(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS); - } - } - - public static Stream dataForTestInvalidQueueConfigShouldThrow() { - final List invalidConfigs = - Arrays.asList( - // no required test-super user - """ - # Credentials - credentials: - users: - admin: - password: 'secret-cluster-cookie' - roles: [ super ] - replicator: - password: 'secret' - roles: [ replication ] - storage: - roles: [ sharding ] - password: storage - - # advertise configs for all nodes - iproto: - advertise: - peer: - login: replicator - sharding: - login: storage - password: storage - - roles: [ roles.metrics-export ] - # queues configs - roles_cfg: - app.roles.queue: - queues: - - name: test - deduplication_mode: keep_latest - disabled_filters_by: [ sharding_key ] - roles.metrics-export: - http: - - listen: 8081 - endpoints: - - format: prometheus - path: '/metrics' - - groups: - routers: - replicasets: - r-1: - sharding: - roles: [ router ] - roles: [ app.roles.api ] - instances: - router: - iproto: - listen: - - uri: router:3301 - storages: - replicasets: - shard-1: - replication: - failover: manual - sharding: - roles: [ storage ] - leader: master - instances: - master: - iproto: - listen: - - uri: master:3301 - net_msg_max: 768 - """, - // no consumer storage to connect from grpc - """ - # Credentials - credentials: - users: - test-super: - password: 'test' - roles: [ super ] - admin: - password: 'secret-cluster-cookie' - roles: [ super ] - replicator: - password: 'secret' - roles: [ replication ] - storage: - roles: [ sharding ] - password: storage - - # advertise configs for all nodes - iproto: - advertise: - peer: - login: replicator - sharding: - login: storage - password: storage - - roles: [ roles.metrics-export ] - # queues configs - roles_cfg: - app.roles.queue: - queues: - - name: test - deduplication_mode: keep_latest - disabled_filters_by: [ sharding_key ] - roles.metrics-export: - http: - - listen: 8081 - endpoints: - - format: prometheus - path: '/metrics' - - groups: - routers: - replicasets: - r-1: - sharding: - roles: [ router ] - roles: [ app.roles.api ] - instances: - router: - iproto: - listen: - - uri: router:3301 - """); - - return invalidConfigs.stream() - .map( - s -> { - final Path testConfigPath = TEST_TEMP_DIR.resolve(UUID.randomUUID().toString()); - try { - Files.writeString(testConfigPath, s); - return Arguments.of(testConfigPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - @ParameterizedTest - @MethodSource("dataForTestInvalidQueueConfigShouldThrow") - void testInvalidQueueConfigShouldThrow(Path queueConfig) { - Assertions.assertThrows( - ContainerLaunchException.class, - () -> { - try (TQEConfigurator configurator = - FileTQEConfigurator.builder(IMAGE_NAME, queueConfig, Set.of(SIMPLE_GRPC_CONFIG)) - .withStartupTimeout(Duration.ofSeconds(5)) - .build(); - TQECluster cluster = new TQEClusterImpl(configurator)) { - cluster.start(); - } - }); - } - - public static Stream dataForTestInvalidGrpcConfig() { - final List invalidGrpcConfigs = - Arrays.asList( - """ - core_port: 1111 - grpc_listen: - - uri: 'tcp://0.0.0.0:18182' - - producer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - routers: - - "unknown:3301" - - consumer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - storage: - - "master:3301" - """, - // no consumers and producers - """ - core_port: 1111 - grpc_listen: - - uri: 'tcp://0.0.0.0:18182' - - producer: - enabled: false - tarantool: - user: test-super - pass: test - connections: - routers: - - "router:3301" - - consumer: - enabled: false - tarantool: - user: test-super - pass: test - connections: - storage: - - "master:3301" - """, - // no core_port parameter - """ - grpc_listen: - - uri: 'tcp://0.0.0.0:18182' - - producer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - routers: - - "router:3301" - - consumer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - storage: - - "master:3301" - """, - // no listen.uri parameter - """ - core_port: 1111 - - producer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - routers: - - "router:3301" - - consumer: - enabled: true - tarantool: - user: test-super - pass: test - connections: - storage: - - "master:3301" - """); - - return invalidGrpcConfigs.stream() - .map( - s -> { - final Path testConfigPath = TEST_TEMP_DIR.resolve(UUID.randomUUID() + ".yml"); - try { - Files.writeString(testConfigPath, s); - return testConfigPath; - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - @ParameterizedTest - @MethodSource("dataForTestInvalidGrpcConfig") - void testInvalidGrpcConfig(Path grpcConfig) { - Assertions.assertThrows( - ContainerLaunchException.class, - () -> { - try (TQEConfigurator configurator = - FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(grpcConfig)) - .withStartupTimeout(Duration.ofSeconds(5)) - .build(); - TQECluster cluster = new TQEClusterImpl(configurator)) { - cluster.start(); - } - }); - } - - @RepeatedTest(10) - void testPublishAndConsumeData() { - Assertions.assertDoesNotThrow( - () -> { - final ObjectMapper MAPPER = new ObjectMapper(); - - try (TQEConfigurator configurator = - FileTQEConfigurator.builder( - IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG)) - .build(); - TQECluster cluster = new TQEClusterImpl(configurator)) { - cluster.start(); - - final String queueName = "test"; - - final List> producers = - cluster.grpc().values().stream() - .filter(g -> g.roles().contains(GrpcRole.PRODUCER)) - .toList(); - final List> consumers = - cluster.grpc().values().stream() - .filter(g -> g.roles().contains(GrpcRole.CONSUMER)) - .toList(); - - Assertions.assertFalse(producers.isEmpty()); - Assertions.assertFalse(consumers.isEmpty()); - - final Set grpcAddresses = producers.get(0).grpcAddresses(); - final Set consumerAddresses = consumers.get(0).grpcAddresses(); - - final Optional publisherAddress = grpcAddresses.stream().findFirst(); - Assertions.assertTrue(publisherAddress.isPresent()); - final Optional consumerAddress = - consumerAddresses.stream().findFirst(); - Assertions.assertTrue(consumerAddress.isPresent()); - - final ManagedChannel producerChannel = - ManagedChannelBuilder.forAddress( - publisherAddress.get().getHostName(), publisherAddress.get().getPort()) - .usePlaintext() - .build(); - - final ManagedChannel consumerChannel = - ManagedChannelBuilder.forAddress( - consumerAddress.get().getHostName(), consumerAddress.get().getPort()) - .usePlaintext() - .build(); - - final ProducerBlockingStub producer = ProducerGrpc.newBlockingStub(producerChannel); - final ConsumerServiceStub consumer = ConsumerServiceGrpc.newStub(consumerChannel); - - final List users = - Instancio.ofList(User.class) - .size(100) - .generate( - Select.field(User::getName), - g -> g.string().alphaNumeric().allowEmpty().nullable()) - .generate(Select.field(User::getAge), Generators::ints) - .create(); - - final ProduceRequest.Builder requestBuilder = - ProduceRequest.newBuilder().setQueue(queueName); - for (User user : users) { - requestBuilder.addMessages( - ProduceMessage.newBuilder() - .setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user)))); - } - final ProduceRequest produceRequest = requestBuilder.build(); - producer.produce(produceRequest); - - final Set result = new CopyOnWriteArraySet<>(); - StreamObserver requestsStream = - consumer.subscribe( - new StreamObserver() { - @Override - public void onNext(SubscriptionStreamResponse response) { - response.getNotifications().getNotificationsList().stream() - .map( - n -> { - try { - return MAPPER.readValue( - n.getMessage().getPayload().toByteArray(), User.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .forEach(result::add); - } - - @Override - public void onError(Throwable t) {} - - @Override - public void onCompleted() {} - }); - requestsStream.onNext( - SubscriptionStreamRequest.newBuilder() - .setSubscribeRequest( - SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName)) - .build()); - - Unreliables.retryUntilTrue( - 5, TimeUnit.SECONDS, () -> new LinkedHashSet<>(users).size() == result.size()); - Assertions.assertEquals(new LinkedHashSet<>(users), result); - consumerChannel.shutdownNow(); - producerChannel.shutdownNow(); - } - }); - } -} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java new file mode 100644 index 00000000..f27e7c8c --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import org.instancio.Instancio; +import org.instancio.Select; +import org.instancio.generators.Generators; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.utils.pojo.User; + +class TQEClusterIntegrationTest { + + private static final String QUEUE_NAME = "test"; + private static final int USERS_COUNT = 100; + private static final int RETRY_TIMEOUT_SECONDS = 60; + + @ParameterizedTest + @EnumSource(TQEVersion.class) + @DisplayName("publish → subscribe round-trip across TQE versions") + void testPublishAndConsumeData(TQEVersion version) { + Assertions.assertDoesNotThrow( + () -> { + try (TQEClusterFixture fx = new TQEClusterFixture(version)) { + ManagedChannel publisherChannel = fx.createPublisherChannel(); + ManagedChannel consumerChannel = fx.createConsumerChannel(); + try { + final List users = generateUsers(); + + Unreliables.retryUntilSuccess( + RETRY_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + () -> { + version.strategy().publish(publisherChannel, users, QUEUE_NAME); + return true; + }); + + final Set result = new CopyOnWriteArraySet<>(); + Unreliables.retryUntilSuccess( + RETRY_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + () -> { + version.strategy().subscribe(consumerChannel, QUEUE_NAME, result); + return true; + }); + + Unreliables.retryUntilTrue( + RETRY_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + () -> new LinkedHashSet<>(users).size() == result.size()); + Assertions.assertEquals(new LinkedHashSet<>(users), result); + } finally { + consumerChannel.shutdownNow(); + publisherChannel.shutdownNow(); + } + } + }); + } + + private static List generateUsers() { + return Instancio.ofList(User.class) + .size(USERS_COUNT) + .generate( + Select.field(User::getName), g -> g.string().alphaNumeric().allowEmpty().nullable()) + .generate(Select.field(User::getAge), Generators::ints) + .create(); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java new file mode 100644 index 00000000..1bea5e2b --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java @@ -0,0 +1,340 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.tqe.TQECluster; +import org.testcontainers.containers.tqe.configuration.TQEConfigurator; + +class TQEClusterTest { + + @ParameterizedTest + @EnumSource(TQEVersion.class) + void testStartupAndShutdown(TQEVersion version) { + try (TQEClusterFixture fx = new TQEClusterFixture(version)) { + // Fixture starts the cluster in its constructor; close() stops it. + } + } + + @ParameterizedTest + @EnumSource(TQEVersion.class) + void testRestartMethod(TQEVersion version) throws Exception { + try (TQEClusterFixture fx = new TQEClusterFixture(version)) { + fx.restart(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS); + } + } + + public static Stream dataForTestInvalidQueueConfig() { + final List invalidConfigs = + Arrays.asList( + // no required test-super user + """ + # Credentials + credentials: + users: + admin: + password: 'secret-cluster-cookie' + roles: [ super ] + replicator: + password: 'secret' + roles: [ replication ] + storage: + roles: [ sharding ] + password: storage + + # advertise configs for all nodes + iproto: + advertise: + peer: + login: replicator + sharding: + login: storage + password: storage + + roles: [ roles.metrics-export ] + # queues configs + roles_cfg: + app.roles.queue: + queues: + - name: test + deduplication_mode: keep_latest + disabled_filters_by: [ sharding_key ] + roles.metrics-export: + http: + - listen: 8081 + endpoints: + - format: prometheus + path: '/metrics' + + groups: + routers: + replicasets: + r-1: + sharding: + roles: [ router ] + roles: [ app.roles.api ] + instances: + router: + iproto: + listen: + - uri: router:3301 + storages: + replicasets: + shard-1: + replication: + failover: manual + sharding: + roles: [ storage ] + leader: master + instances: + master: + iproto: + listen: + - uri: master:3301 + net_msg_max: 768 + """, + // no consumer storage to connect from grpc + """ + # Credentials + credentials: + users: + test-super: + password: 'test' + roles: [ super ] + admin: + password: 'secret-cluster-cookie' + roles: [ super ] + replicator: + password: 'secret' + roles: [ replication ] + storage: + roles: [ sharding ] + password: storage + + # advertise configs for all nodes + iproto: + advertise: + peer: + login: replicator + sharding: + login: storage + password: storage + + roles: [ roles.metrics-export ] + # queues configs + roles_cfg: + app.roles.queue: + queues: + - name: test + deduplication_mode: keep_latest + disabled_filters_by: [ sharding_key ] + roles.metrics-export: + http: + - listen: 8081 + endpoints: + - format: prometheus + path: '/metrics' + + groups: + routers: + replicasets: + r-1: + sharding: + roles: [ router ] + roles: [ app.roles.api ] + instances: + router: + iproto: + listen: + - uri: router:3301 + """); + + return TQEVersion.all() + .flatMap( + version -> + invalidConfigs.stream() + .map( + s -> { + final Path testConfigPath = + TQETestHelper.TEST_TEMP_DIR.resolve(UUID.randomUUID().toString()); + try { + Files.writeString(testConfigPath, s); + return Arguments.of(version, testConfigPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + @ParameterizedTest + @MethodSource("dataForTestInvalidQueueConfig") + void testInvalidQueueConfig(TQEVersion version, Path queueConfig) { + Assertions.assertThrows( + ContainerLaunchException.class, + () -> { + try (TQEConfigurator configurator = + version + .configuratorBuilder(queueConfig, Set.of(version.grpcConfig())) + .withStartupTimeout(Duration.ofSeconds(5)) + .build(); + TQECluster cluster = version.createCluster(configurator)) { + cluster.start(); + } + }); + } + + public static Stream dataForTestInvalidGrpcConfig() { + return TQEVersion.all() + .flatMap( + version -> { + final List invalidGrpcConfigs = + Arrays.asList( + // unknown host + """ + core_port: 1111 + grpc_listen: + - uri: 'tcp://0.0.0.0:18182' + + %s: + enabled: true + tarantool: + user: test-super + pass: test + connections: + routers: + - "unknown:3301" + + consumer: + enabled: true + tarantool: + user: test-super + pass: test + connections: + storage: + - "master:3301" + """ + .formatted(version.producerRoleName()), + // no consumers and producers + """ + core_port: 1111 + grpc_listen: + - uri: 'tcp://0.0.0.0:18182' + + %s: + enabled: false + tarantool: + user: test-super + pass: test + connections: + routers: + - "router:3301" + + consumer: + enabled: false + tarantool: + user: test-super + pass: test + connections: + storage: + - "master:3301" + """ + .formatted(version.producerRoleName()), + // no core_port parameter + """ + grpc_listen: + - uri: 'tcp://0.0.0.0:18182' + + %s: + enabled: true + tarantool: + user: test-super + pass: test + connections: + routers: + - "router:3301" + + consumer: + enabled: true + tarantool: + user: test-super + pass: test + connections: + storage: + - "master:3301" + """ + .formatted(version.producerRoleName()), + // no listen.uri parameter + """ + core_port: 1111 + + %s: + enabled: true + tarantool: + user: test-super + pass: test + connections: + routers: + - "router:3301" + + consumer: + enabled: true + tarantool: + user: test-super + pass: test + connections: + storage: + - "master:3301" + """ + .formatted(version.producerRoleName())); + + return invalidGrpcConfigs.stream() + .map( + s -> { + final Path testConfigPath = + TQETestHelper.TEST_TEMP_DIR.resolve(UUID.randomUUID() + ".yml"); + try { + Files.writeString(testConfigPath, s); + return Arguments.of(version, testConfigPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }); + } + + @ParameterizedTest + @MethodSource("dataForTestInvalidGrpcConfig") + void testInvalidGrpcConfig(TQEVersion version, Path grpcConfig) { + Assertions.assertThrows( + ContainerLaunchException.class, + () -> { + try (TQEConfigurator configurator = + version + .configuratorBuilder(version.queueConfig(), Set.of(grpcConfig)) + .withStartupTimeout(Duration.ofSeconds(5)) + .build(); + TQECluster cluster = version.createCluster(configurator)) { + cluster.start(); + } + }); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java new file mode 100644 index 00000000..88b96e5e --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.rnorth.ducttape.unreliables.Unreliables; + +final class TQETestHelper { + + static final Path TEST_TEMP_DIR; + + static { + try { + TEST_TEMP_DIR = Files.createTempDirectory("tqe-test-"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private TQETestHelper() {} + + static Path loadConfig(String resourcePath) { + try { + return Paths.get( + Objects.requireNonNull(TQETestHelper.class.getClassLoader().getResource(resourcePath)) + .toURI()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + static ManagedChannel createReadyChannel(InetSocketAddress address) { + return Unreliables.retryUntilSuccess( + 60, + TimeUnit.SECONDS, + () -> { + ManagedChannel ch = + ManagedChannelBuilder.forAddress(address.getHostName(), address.getPort()) + .usePlaintext() + .maxInboundMessageSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(5, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .build(); + + ch.getState(true); + Unreliables.retryUntilTrue( + 5, + TimeUnit.SECONDS, + () -> { + io.grpc.ConnectivityState state = ch.getState(false); + if (state == io.grpc.ConnectivityState.READY) { + return true; + } + ch.resetConnectBackoff(); + Thread.sleep(100); + return false; + }); + return ch; + }); + } +} diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java new file mode 100644 index 00000000..15c22df4 --- /dev/null +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package org.testcontainers.containers.integration.tqe; + +import java.nio.file.Path; +import java.util.Set; +import java.util.stream.Stream; + +import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole; +import org.testcontainers.containers.tqe.TQE2ClusterImpl; +import org.testcontainers.containers.tqe.TQE3ClusterImpl; +import org.testcontainers.containers.tqe.TQECluster; +import org.testcontainers.containers.tqe.configuration.FileTQEConfigurator; +import org.testcontainers.containers.tqe.configuration.TQEConfigurator; +import org.testcontainers.utility.DockerImageName; + +/** + * Encapsulates all version-specific aspects of a TQE test: image, configs, gRPC role names, + * builder/cluster factories, and the gRPC strategy. + * + *

Adding a new TQE version (e.g. TQE 4.x) is an Open/Closed-friendly change: add a new constant, + * override the abstract methods — no test code needs to change. + */ +enum TQEVersion { + TQE2("TQE 2.x", "publisher", GrpcRole.PUBLISHER, false, TQE2GrpcTestStrategy.INSTANCE) { + @Override + public DockerImageName imageName() { + return IMAGE_TQE2; + } + + @Override + public Path queueConfig() { + return QUEUE_CONFIG_TQE2; + } + + @Override + public Path grpcConfig() { + return GRPC_CONFIG_TQE2; + } + + @Override + public FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc) { + return FileTQEConfigurator.tqe2Builder(imageName(), queue, grpc); + } + + @Override + public TQECluster createCluster(TQEConfigurator configurator) { + return new TQE2ClusterImpl(configurator); + } + }, + + TQE3("TQE 3.x", "producer", GrpcRole.PRODUCER, true, TQE3GrpcTestStrategy.INSTANCE) { + @Override + public DockerImageName imageName() { + return IMAGE_TQE3; + } + + @Override + public Path queueConfig() { + return QUEUE_CONFIG_TQE3; + } + + @Override + public Path grpcConfig() { + return GRPC_CONFIG_TQE3; + } + + @Override + public FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc) { + return FileTQEConfigurator.tqe3Builder(imageName(), queue, grpc); + } + + @Override + public TQECluster createCluster(TQEConfigurator configurator) { + return new TQE3ClusterImpl(configurator); + } + }; + + private static final DockerImageName IMAGE_TQE2 = + DockerImageName.parse( + System.getenv().getOrDefault("TARANTOOL_REGISTRY", "") + + "tarantool/message-queue-ee:2.5.3"); + + private static final DockerImageName IMAGE_TQE3 = + DockerImageName.parse( + System.getenv().getOrDefault("TARANTOOL_REGISTRY", "") + + "tarantool/message-queue-ee:v3.5.0"); + + private static final Path QUEUE_CONFIG_TQE2 = + TQETestHelper.loadConfig("tqe2/simple-config/simple-queue.yml"); + private static final Path GRPC_CONFIG_TQE2 = + TQETestHelper.loadConfig("tqe2/simple-config/simple-grpc.yml"); + private static final Path QUEUE_CONFIG_TQE3 = + TQETestHelper.loadConfig("tqe3/simple-config/simple-queue.yml"); + private static final Path GRPC_CONFIG_TQE3 = + TQETestHelper.loadConfig("tqe3/simple-config/simple-grpc.yml"); + + private final String displayName; + private final String producerRoleName; + private final GrpcRole producerRole; + private final boolean requiresConfigure; + private final GrpcTestStrategy strategy; + + TQEVersion( + String displayName, + String producerRoleName, + GrpcRole producerRole, + boolean requiresConfigure, + GrpcTestStrategy strategy) { + this.displayName = displayName; + this.producerRoleName = producerRoleName; + this.producerRole = producerRole; + this.requiresConfigure = requiresConfigure; + this.strategy = strategy; + } + + public String displayName() { + return displayName; + } + + public String producerRoleName() { + return producerRoleName; + } + + public GrpcRole producerRole() { + return producerRole; + } + + /** + * Whether manual orchestration of {@code configurator.configure()} is required between starting + * the queue and the gRPC containers. TQE 2.x auto-configures inside {@code + * startTarantoolCluster()}; TQE 3.x defers it to {@code startGrpcEndpoints()}. Tests that drive + * the configurator directly (without a cluster) use this flag. + */ + public boolean requiresConfigure() { + return requiresConfigure; + } + + public GrpcTestStrategy strategy() { + return strategy; + } + + public abstract DockerImageName imageName(); + + public abstract Path queueConfig(); + + public abstract Path grpcConfig(); + + public abstract FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc); + + public abstract TQECluster createCluster(TQEConfigurator configurator); + + static Stream all() { + return Stream.of(values()); + } + + @Override + public String toString() { + return displayName; + } +} diff --git a/testcontainers/src/test/proto/tqe2/messages/message.proto b/testcontainers/src/test/proto/tqe2/messages/message.proto new file mode 100644 index 00000000..595f4740 --- /dev/null +++ b/testcontainers/src/test/proto/tqe2/messages/message.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +package tarantool.queue_ee; + +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option java_package = "tarantool.queue_ee.v2"; +option java_outer_classname = "Message"; + +// Пара ключ-значение +message Pair { + // Ключ пары + string key = 1; + + // Значение пары + string value = 2; +} + +// Сообщение в очереди +message QueueMessage { + // Идентификатор сообщения + // Заполняется автоматически при записи сообщения в очередь + uint64 id = 1; + + // Название очереди в которую необходимо опубликовать сообщение + string queue = 2; + + // Ключ маршрутизации сообщения (тип сообщения) + // необходим для фильтрации сообщений из очереди на консьюмерах + optional string routing_key = 3; + + // Ключ шардирования + // необходим для распределения данных в системе + optional string sharding_key = 4; + + // Ключ дедупликации + // необходим для проверки повторных сообщений, + // если не указан, то проверка не производится + optional string deduplication_key = 5; + + // Произвольные данные в бинарном формате, содержит тело сообщения + bytes payload = 6; + + // Произвольные данные в бинарном формате, + // содержит дополнительные для сообщения данные, + // необходимые для отладки и трассировки + map metadata = 7 [deprecated = true]; + + // Время вставки сообщения в очередь в наносекундах + int64 timestamp = 8; + + // Произвольные данные в формате списка из пар ключ-значения + repeated Pair metadata_pairs = 9; +} diff --git a/testcontainers/src/test/proto/tqe2/services/consumer.proto b/testcontainers/src/test/proto/tqe2/services/consumer.proto new file mode 100644 index 00000000..dc24e94f --- /dev/null +++ b/testcontainers/src/test/proto/tqe2/services/consumer.proto @@ -0,0 +1,58 @@ +syntax = "proto3"; + +package tarantool.queue_ee; + +import "messages/message.proto"; + +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option java_package = "tarantool.queue_ee.v2"; +option java_outer_classname = "Consumer"; + +// Сервер подписок на сообщения брокера очередей +service ConsumerService { + // Подписка на сообщения с фильтром + rpc Subscribe(SubscriptionRequest) returns (stream SubscriptionNotifications); +} + +// Запрос на подписку +message SubscriptionRequest { + // Название очереди + string queue = 1; + + // Ключ маршрутизации сообщения (тип сообщения) + // необходим для фильтрации сообщений из очереди + // Если не указан, то подписка происходит на все типы сообщений в очереди + optional string routing_key = 2; + + // Опциональная строка указатель на последнее полученное сообщение. + // Необходимо для возможности получения истории сообщений + // или восстановления работы консьюмера после сбоя + // Значение не указано - подписка с текущего момента + // Значение пустая строка - подписка с начала очереди + // Значение указано - подписка с указанного сообщения в очереди + optional string cursor = 3; + + // Ключ шардирования + // необходим для распределения данных в системе + // Если не указан, то подписка происходит на все типы сообщений в очереди + optional string sharding_key = 4; + + // Ключи шардирования позволяют производить фильтрацию по нескольким ключам + // шардирования в рамках одной подписки + repeated string sharding_keys = 5; +} + +// Сообщение в стриме подписки +message SubscriptionNotifications { + // Новые сообщения в очереди с курсорами + repeated SubscriptionNotification notifications = 1; +} + +// Уведомление клиента о новых сообщение в очереди +message SubscriptionNotification { + // Строка-указатель сообщения + string cursor = 1; + + // Сообщение + QueueMessage message = 2; +} diff --git a/testcontainers/src/test/proto/tqe2/services/publisher.proto b/testcontainers/src/test/proto/tqe2/services/publisher.proto new file mode 100644 index 00000000..949e50f2 --- /dev/null +++ b/testcontainers/src/test/proto/tqe2/services/publisher.proto @@ -0,0 +1,263 @@ +syntax = "proto3"; + +package tarantool.queue_ee; + +import "messages/message.proto"; + +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option java_package = "tarantool.queue_ee.v2"; +option java_outer_classname = "Publisher"; + +// Сервер публикации сообщений брокера очередей +service PublisherService { + // Публикация сообщения в очередь + rpc Publish(PublishRequest) returns (PublishResponse); + // Публикация сообщений в очередь через двусторонний стрим + rpc PublishStream(stream PublishStreamRequest) returns (stream PublishStreamResponse); + + // Публикация набора сообщений в очередь + rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse); + // Публикация набора сообщений в очередь через двусторонний стрим + rpc PublishBatchStream( + stream PublishBatchStreamRequest + ) returns ( + stream PublishBatchStreamResponse + ); + + // Публикация сообщения на указанные шарды очереди + rpc Broadcast(BroadcastRequest) returns (BroadcastResponse); +} + +// Режим дедупликации сообщения +enum Deduplication { + DEDUPLICATION_UNSPECIFIED = 0; + DEDUPLICATION_BASIC = 1; + DEDUPLICATION_EXTENDED = 2; + DEDUPLICATION_KEEP_FIRST = 3; + DEDUPLICATION_KEEP_LATEST = 4; +} + +// Запрос на публикацию сообщения в очередь +message PublishRequest { + // Название очереди в которой необходимо опубликовать сообщение + string queue = 1; + + // Ключ маршрутизации сообщения (тип сообщения) + // необходим для фильтрации сообщений из очереди на консьюмерах + optional string routing_key = 2; + + // Ключ шардирования + // необходим для распределения данных в системе + optional string sharding_key = 3; + + // Ключ дедупликации + // необходим для проверки повторных сообщений, + // если не указан, то проверка не производится + optional string deduplication_key = 4; + + // Произвольные данные в бинарном формате, содержит тело сообщения + bytes payload = 5; + + // Произвольные данные в бинарном формате, + // содержит дополнительные для сообщения данные, + // необходимые для отладки и трассировки + map metadata = 6 [deprecated = true]; + + // Произвольные данные в формате списка из пар ключ-значения + repeated Pair metadata_pairs = 7; + + // Режим дедупликации сообщения + Deduplication deduplication = 8; +} + +// Запрос на публикация набора сообщений в очередь +message PublishBatchRequest { + // Название очереди в которой необходимо опубликовать сообщения + string queue = 1; + + // Ключ шардирования + // необходим для распределения данных в системе + optional string sharding_key = 2; + + // Набор сообщений + repeated BatchRequestMessage messages = 3; + + // Содержит дополнительные данные необходимые для отладки и трассировки + map metadata = 4 [deprecated = true]; + + // Произвольные данные в формате списка из пар ключ-значения + repeated Pair metadata_pairs = 5; + + // Режим дедупликации сообщения + Deduplication deduplication = 8; +} + +// Набор сообщений +message BatchRequestMessage { + // Ключ маршрутизации сообщения (тип сообщения) + // необходим для фильтрации сообщений из очереди на консьюмерах + optional string routing_key = 1; + + // Ключ дедупликации + // Необходим для проверки повторных сообщений, + // если не указан, то проверка не производится + optional string deduplication_key = 2; + + // Произвольные данные в бинарном формате, содержит тело сообщения + bytes payload = 3; + + // Произвольные данные в бинарном формате, + // содержит дополнительные для сообщения данные, + // необходимые для отладки и трассировки + map metadata = 4 [deprecated = true]; + + // Произвольные данные в формате списка из пар ключ-значения + repeated Pair metadata_pairs = 5; +} + +// Ответ на публикацию набора сообщений +message PublishBatchResponse { + // Идентификаторы сообщений + repeated uint64 ids = 1; + // Содержит дополнительные данные необходимые для отладки и трассировки + map metadata = 2 [deprecated = true]; + // Флаги наличия дубликатов + repeated bool is_duplicates = 3; +} + +// Ответ на публикацию сообщения +message PublishResponse { + // Идентификатор сообщения добавленного в очередь + // (возможно не нужно) + uint64 id = 1; + // Содержит дополнительные данные необходимые для отладки и трассировки + map metadata = 2 [deprecated = true]; + // Если true, то был дубликат сообщения + bool is_duplicate = 3; +} + +// Зарос на публикацию сообщения через двусторонний стрим +message PublishStreamRequest { + // Идентификатор запроса на публикацию сообщения + uint64 request_id = 1; + + // Запрос на публикацию сообщения + PublishRequest request = 2; +} + +// Ответ на публикацию сообщения через двусторонний стрим +message PublishStreamResponse { + // Идентификатор запроса на публикацию сообщения + uint64 request_id = 1; + + oneof result { + // Сообщение об успешной публикации + PublishResponse success = 2; + // Сообщение об ошибке публикации + Error error = 3; + } +} + +// Запрос на публикацию набора сообщений через двусторонний стрим +message PublishBatchStreamRequest { + // Идентификатор запроса на публикацию сообщения + uint64 request_id = 1; + + // Запрос на публикацию набора сообщений + PublishBatchRequest request = 2; +} + +// Ответ на публикацию набора сообщений через двусторонний стрим +message PublishBatchStreamResponse { + // Идентификатор запроса на публикацию сообщения + uint64 request_id = 1; + + oneof result { + // Сообщение об успешной публикации + PublishBatchResponse success = 2; + // Сообщение об ошибке публикации + Error error = 3; + } +} + +// Запрос на рассылку сообщения на указанные шарды +message BroadcastRequest { + // Название очереди, в которую необходимо опубликовать сообщение + string queue = 1; + + // Ключ маршрутизации сообщения (тип сообщения) + // необходим для фильтрации сообщений из очереди на консьюмерах + optional string routing_key = 2; + + // Ключ дедупликации + // необходим для проверки повторных сообщений, + // если не указан, то проверка не производится + optional string deduplication_key = 3; + + // Произвольные данные в бинарном формате, содержит тело сообщения + bytes payload = 4; + + // Произвольные данные в бинарном формате, + // содержит дополнительные для сообщения данные, + // необходимые для отладки и трассировки + map metadata = 5 [deprecated = true]; + + // Список с названиями репликасетов, на которые нужно опубликовать сообщение. + // По умолчанию рассылка происходит на все шарды. + repeated string replicasets = 6; + + // Максимальное время на рассылку сообщения + optional uint64 timeout = 7; + + // Произвольные данные в формате списка из пар ключ-значения + repeated Pair metadata_pairs = 8; + + // Режим дедупликации сообщения + Deduplication deduplication = 9; +} + +// Сообщение об успешной публикации +message Success { + // Идентификатор сообщения добавленного в очередь + uint64 id = 1; + // Флаги наличия дубликатов + bool is_duplicate = 2; +} + +// Сообщение об ошибке публикации +message Error { + // Код ошибки + uint32 code = 1; + // Сообщение об ошибке + string message = 2; +} + +// Ответ репликасета на публикацию сообщения +message ReplicasetResponse { + // Сообщение с результатами публикации сообщения + oneof result { + // Сообщение об успешной публикации + Success success = 1; + + // Сообщение об ошибке публикации + Error error = 2; + } +} + +// Ответ на рассылку сообщения +message BroadcastResponse { + // Код завершения рассылки: + // 0 - Успешная публикация + // 1 - Ошибка на роутере + // 2 - Ошибка на репликасете + uint32 code = 1; + + // Сообщение об ошибке + optional string error = 2; + + // Набор ответов с шардов + map replicasets = 3; + + // Содержит дополнительные данные необходимые для отладки и трассировки + map metadata = 4 [deprecated = true]; +} diff --git a/testcontainers/src/test/proto/messages/cursor.proto b/testcontainers/src/test/proto/tqe3/messages/cursor.proto similarity index 100% rename from testcontainers/src/test/proto/messages/cursor.proto rename to testcontainers/src/test/proto/tqe3/messages/cursor.proto diff --git a/testcontainers/src/test/proto/messages/message.proto b/testcontainers/src/test/proto/tqe3/messages/message.proto similarity index 100% rename from testcontainers/src/test/proto/messages/message.proto rename to testcontainers/src/test/proto/tqe3/messages/message.proto diff --git a/testcontainers/src/test/proto/services/consumer.proto b/testcontainers/src/test/proto/tqe3/services/consumer.proto similarity index 100% rename from testcontainers/src/test/proto/services/consumer.proto rename to testcontainers/src/test/proto/tqe3/services/consumer.proto diff --git a/testcontainers/src/test/proto/services/producer.proto b/testcontainers/src/test/proto/tqe3/services/producer.proto similarity index 100% rename from testcontainers/src/test/proto/services/producer.proto rename to testcontainers/src/test/proto/tqe3/services/producer.proto diff --git a/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml b/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml new file mode 100644 index 00000000..d8044bac --- /dev/null +++ b/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml @@ -0,0 +1,21 @@ +core_port: 1111 +grpc_listen: + - uri: 'tcp://0.0.0.0:18182' + +publisher: + enabled: true + tarantool: + user: test-super + pass: test + connections: + routers: + - "router:3301" + +consumer: + enabled: true + tarantool: + user: test-super + pass: test + connections: + storage: + - "master:3301" diff --git a/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml b/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml new file mode 100644 index 00000000..63397f99 --- /dev/null +++ b/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml @@ -0,0 +1,66 @@ +# Credentials +credentials: + users: + test-super: + password: 'test' + roles: [ super ] + admin: + password: 'secret-cluster-cookie' + roles: [ super ] + replicator: + password: 'secret' + roles: [ replication ] + storage: + roles: [ sharding ] + password: storage + +# advertise configs for all nodes +iproto: + advertise: + peer: + login: replicator + sharding: + login: storage + password: storage + +roles: [ roles.metrics-export ] +# queues configs +roles_cfg: + app.roles.queue: + queues: + - name: test + deduplication_mode: keep_latest + disabled_filters_by: [ sharding_key ] + roles.metrics-export: + http: + - listen: 8081 + endpoints: + - format: prometheus + path: '/metrics' + +groups: + routers: + replicasets: + r-1: + sharding: + roles: [ router ] + roles: [ app.roles.api ] + instances: + router: + iproto: + listen: + - uri: router:3301 + storages: + replicasets: + shard-1: + replication: + failover: manual + sharding: + roles: [ storage ] + roles: [ app.roles.queue ] + leader: master + instances: + master: + iproto: + listen: + - uri: master:3301 diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml b/testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml similarity index 84% rename from testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml rename to testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml index 2c4c609f..9353580b 100644 --- a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml +++ b/testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml @@ -1,6 +1,6 @@ core_port: 1111 grpc_listen: -- uri: 'tcp://0.0.0.0:18182' + - uri: 'tcp://0.0.0.0:18182' producer: enabled: true @@ -13,7 +13,7 @@ producer: retry_delay: 5s connections: routers: - - "router:3301" + - "router:3301" consumer: enabled: true @@ -26,4 +26,4 @@ consumer: retry_delay: 5s connections: storage: - - "master:3301" + - "master:3301" diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml b/testcontainers/src/test/resources/tqe3/simple-config/simple-queue.yml similarity index 100% rename from testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml rename to testcontainers/src/test/resources/tqe3/simple-config/simple-queue.yml