Skip to content

Commit 274fc53

Browse files
committed
Add integration testing for session connection reuse CTR.
The LogCaptor is added per test to prevent polluting the output of the integration tests with lots of unnecessary server logs. There are some inherited tests (namely shouldOpenAndCloseObsceneAmountOfSessions()) which can generate tens of thousands of lines of output.
1 parent 008584d commit 274fc53

5 files changed

Lines changed: 302 additions & 2 deletions

File tree

CHANGELOG.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
2727
* Fixed bug in `SubgraphStrategy` where specifying `edges` and `vertices` filters that had `map`-type steps could generate an error.
2828
* Fixed bug in `ReservedKeysVerificationStrategy` where `AddPropertyStep` was not triggering proper validations.
2929
* Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback.
30+
* Added `SessionedChildClient` that borrows connections from a different `Client` for use with `Sessions`.
31+
* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether to use `SessionedChildClient` for remote transactions.
3032
3133
[[release-3-7-5]]
3234
=== TinkerPop 3.7.5 (Release Date: November 12, 2025)

docs/src/reference/gremlin-variants.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,7 @@ The following table describes the various configuration options for the Gremlin
864864
|path |The URL path to the Gremlin Server. |_/gremlin_
865865
|port |The port of the Gremlin Server to connect to. The same port will be applied for all hosts. |8192
866866
|protocol |Sets the `AuthProperties.Property.PROTOCOL` properties for authentication to Gremlin Server. |_none_
867+
|reuseConnectionsForSessions |Uses a `Client` that will attempt to reuse `Connections` when managing remote transactions with a `DriverRemoteConnection`. |false
867868
|serializer.className |The fully qualified class name of the `MessageSerializer` that will be used to communicate with the server. Note that the serializer configured on the client should be supported by the server configuration. |_none_
868869
|serializer.config |A `Map` of configuration settings for the serializer. |_none_
869870
|username |The username to submit on requests that require authentication. |_none_

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,7 @@ protected synchronized Connection chooseConnection(RequestMessage msg) throws Ti
820820
if (borrowedConnection == null) {
821821
//Borrow from parentClient's pool instead of creating new connection
822822
borrowedConnection = parentClient.chooseConnection(msg);
823+
logger.debug("SessionedChildClient choosing {}", borrowedConnection);
823824
}
824825
//Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback
825826
borrowedConnection.borrowed.incrementAndGet();
@@ -833,6 +834,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
833834
//Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially
834835
//returnToPool() does this
835836
borrowedConnection.returnToPool();
837+
logger.debug("Session closed for {} with count {}", borrowedConnection, borrowedConnection.borrowed.get());
836838

837839
borrowedConnection = null;
838840
}

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,10 @@ public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode b
235235
*/
236236
Optional<String> getSessionId() {
237237
if (client instanceof Client.SessionedClient) {
238-
Client.SessionedClient c = (Client.SessionedClient) client;
238+
final Client.SessionedClient c = (Client.SessionedClient) client;
239+
return Optional.of(c.getSessionId());
240+
} else if (client instanceof Client.SessionedChildClient) {
241+
final Client.SessionedChildClient c = (Client.SessionedChildClient) client;
239242
return Optional.of(c.getSessionId());
240243
}
241244

gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java

Lines changed: 293 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,35 @@
1818
*/
1919
package org.apache.tinkerpop.gremlin.server;
2020

21+
import nl.altindag.log.LogCaptor;
2122
import org.apache.tinkerpop.gremlin.driver.Cluster;
23+
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
24+
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
25+
import org.junit.Test;
26+
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Set;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
32+
33+
import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
34+
import static org.junit.Assert.assertEquals;
35+
import static org.junit.Assert.assertTrue;
36+
import static org.junit.Assert.fail;
37+
import static org.junit.Assume.assumeFalse;
2238

2339
/**
2440
* Integration tests for gremlin-driver and bytecode sessions where the underlying connection can be re-used for
2541
* multiple sessions. The server is configured with "closeSessionPostGraphOp" set to True.
2642
*/
2743
public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegrateTest {
2844

45+
public static Pattern CHANNEL_ID_PATTERN = Pattern.compile("SessionedChildClient choosing.*\\{channel=(.*)\\}");
46+
2947
@Override
3048
protected Cluster createCluster() {
31-
return TestClientFactory.build().create();
49+
return TestClientFactory.build().reuseConnectionsForSessions(true).create();
3250
}
3351

3452
/**
@@ -43,4 +61,278 @@ public Settings overrideSettings(final Settings settings) {
4361

4462
return settings;
4563
}
64+
65+
@Test
66+
public void shouldCleanupResourcesAfterSuccessfulCommit() throws Exception {
67+
assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer());
68+
69+
final LogCaptor logCaptor = LogCaptor.forRoot();
70+
final Cluster cluster = createCluster();
71+
try {
72+
logCaptor.setLogLevelToDebug();
73+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
74+
75+
assertEquals(0, (long) g.V().count().next());
76+
77+
final GraphTraversalSource gtx = g.tx().begin();
78+
gtx.addV("person").iterate();
79+
gtx.tx().commit();
80+
81+
// Check the both client and server side "sessions" are closed.
82+
assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
83+
assertEquals(1, (long) g.V().count().next());
84+
} finally {
85+
cluster.close();
86+
resetLogCaptor(logCaptor);
87+
}
88+
}
89+
90+
@Test
91+
public void shouldCleanupResourcesAfterSuccessfulRollback() throws Exception {
92+
assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer());
93+
94+
final LogCaptor logCaptor = LogCaptor.forRoot();
95+
final Cluster cluster = createCluster();
96+
try {
97+
logCaptor.setLogLevelToDebug();
98+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
99+
100+
assertEquals(0, (long) g.V().count().next());
101+
102+
final GraphTraversalSource gtx = g.tx().begin();
103+
gtx.addV("person").iterate();
104+
gtx.tx().rollback();
105+
106+
// Check the both client and server side "sessions" are closed.
107+
assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
108+
assertEquals(0, (long) g.V().count().next());
109+
} finally {
110+
cluster.close();
111+
resetLogCaptor(logCaptor);
112+
}
113+
}
114+
115+
@Test
116+
public void shouldNotAllowCommittedGtxToBeReused() throws Exception {
117+
final Cluster cluster = createCluster();
118+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
119+
120+
final GraphTraversalSource gtx = g.tx().begin();
121+
gtx.addV("person").iterate();
122+
gtx.tx().commit();
123+
124+
try {
125+
gtx.addV("software").iterate();
126+
gtx.tx().commit();
127+
fail("gtx that has already been committed shouldn't be reusable.");
128+
} catch (Exception e) {
129+
// Consider any exception to be correct behavior.
130+
}
131+
132+
cluster.close();
133+
}
134+
135+
@Test
136+
public void shouldAllowMultipleTransactionsOnDifferentConnection() throws Exception {
137+
final LogCaptor logCaptor = LogCaptor.forRoot();
138+
final Cluster cluster = createCluster();
139+
try {
140+
logCaptor.setLogLevelToDebug();
141+
142+
final Pattern channelCountPattern = Pattern.compile("Session closed for.*count\\s(.*)");
143+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
144+
145+
final GraphTraversalSource gtx1 = g.tx().begin();
146+
final GraphTraversalSource gtx2 = g.tx().begin();
147+
148+
gtx1.addV("person").iterate();
149+
assertEquals(1, (long) gtx1.V().count().next());
150+
151+
gtx2.addV("software").iterate();
152+
assertEquals(1, (long) gtx1.V().count().next());
153+
154+
gtx1.tx().commit();
155+
gtx2.tx().commit();
156+
157+
assertEquals(2, (long) g.V().count().next());
158+
159+
// Ensure that two different underlying connections were used.
160+
final Set<String> channelIds = new HashSet<>();
161+
final List<String> lines = logCaptor.getLogs();
162+
for (String line : lines) {
163+
final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
164+
final Matcher countMatcher = channelCountPattern.matcher(line);
165+
if (idMatcher.find()) {
166+
channelIds.add(idMatcher.group(1));
167+
} else if (countMatcher.find()) {
168+
// Check that the client properly updates the borrowed count so that it's reset to zero after session
169+
// closed, but make the check a bit fuzzy since returning is async and so it may not hit 0 in time
170+
assertTrue(Integer.parseInt(countMatcher.group(1)) < 2);
171+
}
172+
}
173+
174+
assertEquals(2, channelIds.size());
175+
} finally {
176+
cluster.close();
177+
resetLogCaptor(logCaptor);
178+
}
179+
}
180+
181+
@Test
182+
public void shouldAllowMultipleTransactionsOnSameConnection() throws Exception {
183+
assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer());
184+
final LogCaptor logCaptor = LogCaptor.forRoot();
185+
// Cluster setup that has a single connection so simultaneous transactions must share it.
186+
final Cluster cluster = TestClientFactory.
187+
build().
188+
reuseConnectionsForSessions(true).
189+
minConnectionPoolSize(1).
190+
maxConnectionPoolSize(1).
191+
create();
192+
try {
193+
logCaptor.setLogLevelToDebug();
194+
195+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
196+
197+
final GraphTraversalSource gtx1 = g.tx().begin();
198+
final GraphTraversalSource gtx2 = g.tx().begin();
199+
200+
gtx1.addV("person").iterate();
201+
gtx2.addV("software").iterate();
202+
gtx1.addV("place").iterate();
203+
204+
assertEquals(0, (long) g.V().count().next());
205+
gtx1.tx().commit();
206+
assertEquals(2, (long) g.V().count().next());
207+
assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
208+
209+
gtx2.tx().commit();
210+
assertEquals(3, (long) g.V().count().next());
211+
assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
212+
} finally {
213+
cluster.close();
214+
resetLogCaptor(logCaptor);
215+
}
216+
}
217+
218+
@Test
219+
public void shouldReuseSameConnectionForSubsequentTransactionAfterCommit() throws Exception {
220+
assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer());
221+
final LogCaptor logCaptor = LogCaptor.forRoot();
222+
// Cluster setup with single connection to ensure that transaction state isn't shared even though connection
223+
// is reused.
224+
final Cluster cluster = TestClientFactory.build().
225+
minConnectionPoolSize(1).
226+
maxConnectionPoolSize(1).
227+
reuseConnectionsForSessions(true).
228+
create();
229+
try {
230+
logCaptor.setLogLevelToDebug();
231+
232+
String channelId = "";
233+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
234+
235+
final GraphTraversalSource gtx1 = g.tx().begin();
236+
gtx1.addV("person1").iterate();
237+
gtx1.tx().commit();
238+
239+
List<String> lines = logCaptor.getLogs();
240+
for (final String line : lines) {
241+
final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
242+
if (idMatcher.find()) {
243+
// Save the channelId used for this transaction for comparison with the subsequent one.
244+
channelId = idMatcher.group(1);
245+
}
246+
}
247+
assertClientAndServerSessionResourcesClosed(lines);
248+
249+
final GraphTraversalSource gtx2 = g.tx().begin();
250+
gtx2.addV("person2").iterate();
251+
gtx2.tx().commit();
252+
253+
lines = logCaptor.getLogs();
254+
for (final String line : lines) {
255+
final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
256+
if (idMatcher.find()) {
257+
// Ensure that same connection was used for both transactions.
258+
assertEquals(channelId, idMatcher.group(1));
259+
}
260+
}
261+
assertClientAndServerSessionResourcesClosed(lines);
262+
263+
assertEquals(2, (long) g.V().count().next());
264+
} finally {
265+
cluster.close();
266+
resetLogCaptor(logCaptor);
267+
}
268+
}
269+
270+
@Test
271+
public void shouldReuseSameConnectionForSubsequentTransactionAfterRollback() throws Exception {
272+
assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer());
273+
final LogCaptor logCaptor = LogCaptor.forRoot();
274+
final Cluster cluster = TestClientFactory.build().
275+
minConnectionPoolSize(1).
276+
maxConnectionPoolSize(1).
277+
reuseConnectionsForSessions(true).
278+
create();
279+
try {
280+
logCaptor.setLogLevelToDebug();
281+
282+
String channelId = "";
283+
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
284+
285+
final GraphTraversalSource gtx1 = g.tx().begin();
286+
gtx1.addV("person1").iterate();
287+
gtx1.tx().rollback();
288+
289+
List<String> lines = logCaptor.getLogs();
290+
for (final String line : lines) {
291+
final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
292+
if (idMatcher.find()) {
293+
channelId = idMatcher.group(1);
294+
}
295+
}
296+
assertClientAndServerSessionResourcesClosed(lines);
297+
298+
final GraphTraversalSource gtx2 = g.tx().begin();
299+
gtx2.addV("person2").iterate();
300+
gtx2.tx().commit();
301+
302+
lines = logCaptor.getLogs();
303+
for (final String line : lines) {
304+
final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
305+
if (idMatcher.find()) {
306+
assertEquals(channelId, idMatcher.group(1));
307+
}
308+
}
309+
assertClientAndServerSessionResourcesClosed(lines);
310+
311+
assertEquals(1, (long) g.V().count().next());
312+
} finally {
313+
cluster.close();
314+
resetLogCaptor(logCaptor);
315+
}
316+
}
317+
318+
// Needed to prevent other long running tests that don't require the LogCaptor from accidentally logging.
319+
private void resetLogCaptor(final LogCaptor logCaptor) {
320+
logCaptor.resetLogLevel();
321+
logCaptor.clearLogs();
322+
logCaptor.close();
323+
}
324+
325+
private void assertClientAndServerSessionResourcesClosed(final List<String> logLines) {
326+
boolean clientSessionClosed = false;
327+
boolean serverSessionClosed = false;
328+
for (final String line : logLines) {
329+
if (line.matches("Session.*closed$")) {
330+
serverSessionClosed = true;
331+
} else if (line.matches("Session closed for Connection.*")) {
332+
clientSessionClosed = true;
333+
}
334+
}
335+
assertTrue(clientSessionClosed);
336+
assertTrue(serverSessionClosed);
337+
}
46338
}

0 commit comments

Comments
 (0)