Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions testcontainers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<license.header.file>${project.parent.basedir}/LICENSE_HEADER.txt</license.header.file>
<maven.compiler.release>17</maven.compiler.release>
<protobuf-plugin.version>0.6.1</protobuf-plugin.version>
<protobuf-plugin.version>5.1.4</protobuf-plugin.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -104,13 +104,6 @@
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
Expand Down Expand Up @@ -146,25 +139,45 @@
</dependencies>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<groupId>io.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-plugin.version}</version>
<configuration>
<protoTestSourceRoot>${project.basedir}/src/test/proto</protoTestSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protoc kind="binary-maven">
<version>${protobuf.version}</version>
</protoc>
<plugins>
<plugin kind="binary-maven">
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-java</artifactId>
<version>${grpc.version}</version>
</plugin>
</plugins>
</configuration>
<executions>
<execution>
<id>generate-test-tqe3</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
<goal>generate-test</goal>
</goals>
<configuration>
<sourceDirectories>
<sourceDirectory>${project.basedir}/src/test/proto/tqe3</sourceDirectory>
</sourceDirectories>
<outputDirectory>${project.build.directory}/generated-test-sources/protobuf/tqe3</outputDirectory>
</configuration>
</execution>
<execution>
<id>generate-test-tqe2</id>
<goals>
<goal>generate-test</goal>
</goals>
<configuration>
<sourceDirectories>
<sourceDirectory>${project.basedir}/src/test/proto/tqe2</sourceDirectory>
</sourceDirectories>
<outputDirectory>${project.build.directory}/generated-test-sources/protobuf/tqe2</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,20 +20,32 @@
import org.testcontainers.containers.tqe.configuration.TQEConfigurator;
import org.testcontainers.lifecycle.Startable;

public class TQEClusterImpl implements TQECluster {
/**
* Abstract base class for TQE cluster implementations that provides common functionality for
* different TQE versions (2.x, 3.x, etc.).
*/
public abstract class AbstractTQECluster implements TQECluster {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTQECluster.class);

private static final Logger LOGGER = LoggerFactory.getLogger(TQEClusterImpl.class);
private static final ExecutorService STARTUP_EXECUTOR =
Executors.newCachedThreadPool(
r -> {
var t = new Thread(r, "tqe-cluster-startup");
t.setDaemon(true);
return t;
});

private final TQEConfigurator configurator;
protected final TQEConfigurator configurator;

private boolean isClosed;
private volatile boolean isClosed;

public TQEClusterImpl(TQEConfigurator configurator) {
public AbstractTQECluster(TQEConfigurator configurator) {
this.configurator = configurator;
}

@Override
public synchronized void start() {
public final synchronized void start() {
if (this.isClosed) {
throw new IllegalStateException("Container is already closed. Please create new container");
}
Expand All @@ -47,18 +61,43 @@ public synchronized void start() {

@Override
public synchronized void stop() {
if (this.configurator != null) {
try {
this.configurator.close();
this.isClosed = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
this.configurator.close();
this.isClosed = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void startParallel(
/**
* Starts Tarantool cluster nodes and configures the cluster. Default implementation starts all
* queue containers in parallel and then configures the cluster. Subclasses can override to change
* startup order.
*/
protected void startTarantoolCluster() {
startParallel(this.configurator.queue(), this.configurator);
if (!this.configurator.isConfigured()) {
this.configurator.configure();
}
}

/**
* Starts gRPC endpoint containers. Default implementation starts all gRPC containers in parallel.
* Subclasses can override to change startup order.
*/
protected void startGrpcEndpoints() {
startParallel(this.configurator.grpc(), this.configurator);
}

/**
* Common utility method to start containers in parallel.
*
* @param containers map of container names to container instances
* @param configurator the TQE configurator
*/
protected static void startParallel(
Map<String, ? extends Startable> containers, TQEConfigurator configurator) {

final List<CompletableFuture<?>> futures = new ArrayList<>(containers.size());
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();

Expand All @@ -70,12 +109,18 @@ private static void startParallel(
try {
c.start();
} catch (Exception e) {
LOGGER.error("Error starting TQE container [container_name='{}']", name, e);
LOGGER.error(
"Error starting TQE container [cluster='{}', container_name='{}']: {}",
configurator.clusterName(),
name,
e.getMessage(),
e);
errors.add(e);
}
})));
},
STARTUP_EXECUTOR)));

CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})).join();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

if (!errors.isEmpty()) {
throw new ContainerLaunchException(
Expand All @@ -85,24 +130,6 @@ private static void startParallel(
}
}

private void startTarantoolCluster() {
startParallel(this.configurator.queue(), this.configurator);
if (this.configurator.isConfigured()) {
LOGGER.warn(
"TQE cluster [name = {}, queue = {}, grpc = {}] already configured",
this.configurator.clusterName(),
this.configurator.queue().size(),
this.configurator.grpc().size());
return;
}

this.configurator.configure();
}

private void startGrpcEndpoints() {
startParallel(this.configurator.grpc(), this.configurator);
}

@Override
public String clusterName() {
return this.configurator.clusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public interface GrpcContainer<SELF extends GrpcContainer<SELF>>
enum GrpcRole {
CONSUMER("consumer"),

PRODUCER("producer");
PRODUCER("producer"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Думаю, лучше сделать по другому:

enum GrpcRole {
    CONSUMER("consumer"),
    PRODUCER("producer", "publisher"),

   private final List<String> roleAliases;

    GrpcRole(String... aliases) {
        this.aliases = Arrays.asList(aliases);
    }
 

   ...
}


PUBLISHER("publisher");

private final String role;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ private static Set<GrpcRole> resolveRoles(GrpcConfiguration config, Path configP
final Set<GrpcRole> roles = new LinkedHashSet<>();
if (isPublisher.isPresent() && isPublisher.get()) {
roles.add(GrpcRole.PRODUCER);
LOGGER.trace("Publisher role is enabled for: {}", configPath);
roles.add(GrpcRole.PUBLISHER);
LOGGER.trace(
"Publisher/Producer role is enabled for: {} (both PRODUCER and PUBLISHER for TQE2/TQE3"
+ " compatibility)",
configPath);
}

final Optional<Boolean> isConsumer = config.getConsumer().flatMap(ConsumerConfig::getEnabled);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
* All Rights Reserved.
*/

package org.testcontainers.containers.tqe;

import org.testcontainers.containers.tqe.configuration.TQEConfigurator;

/**
* Implementation of TQE cluster interface specifically designed for TQE 2.x compatibility. Uses the
* default startup order: queue → configure → grpc.
*/
public class TQE2ClusterImpl extends AbstractTQECluster {

public TQE2ClusterImpl(TQEConfigurator configurator) {
super(configurator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
* All Rights Reserved.
*/

package org.testcontainers.containers.tqe;

import org.testcontainers.containers.tqe.configuration.TQEConfigurator;

/**
* Implementation of TQE cluster interface specifically designed for TQE 3.x compatibility. Unlike
* TQE 2.x, TQE 3.x configures the cluster between starting queue and gRPC containers: queue →
* configure → grpc.
*/
public class TQE3ClusterImpl extends AbstractTQECluster {

public TQE3ClusterImpl(TQEConfigurator configurator) {
super(configurator);
}

@Override
protected void startTarantoolCluster() {
startParallel(this.configurator.queue(), this.configurator);
}

@Override
protected void startGrpcEndpoints() {
if (!this.configurator.isConfigured()) {
this.configurator.configure();
}
startParallel(this.configurator.grpc(), this.configurator);
}
}
Loading