Skip to content

Commit dff924c

Browse files
committed
Isolate scheduled @asynchronous firings from maxAsync
Add a secondary ScheduledExecutorService to every ManagedScheduledExecutorServiceImpl, dedicated to dispatching @asynchronous(runAt=@schedule(...)) firings. The secondary pool shares the primary's ManagedThreadFactory (preserving naming, virtual flag, priority, and context) but is sized independently via a new ScheduledAsyncCore resource property (default 5). The interceptor now triggers on the secondary pool, so scheduled firings are no longer throttled by the primary pool's corePoolSize=maxAsync limit — satisfying Concurrency 3.1 §3.1 "Scheduled asynchronous methods are treated similar to other scheduled tasks in that they are not subject to max-async constraints" without losing the clause from PR #2577 review that firings must run on the executor named in @asynchronous(executor=). The plain-MES fallback wrapper borrows the default MSES's secondary so short-lived per-invocation wrappers do not leak a fresh pool each time; an ownership flag on the 4-arg constructor controls which pools destroyResource() shuts down. Also fix a pre-existing race in CUTriggerScheduledFuture: cancel() can race ahead of the recursive scheduleNextRun(), leaving futureRef pointing at the just-completed execution so the underlying future reports done rather than cancelled. isCancelled() now falls back to the TriggerTask's cancelled flag. Tests: - ScheduledAsyncCustomFactoryTest — scheduled firing runs on a ManageableThread from the custom MSES's factory, not the primary pool. - ManagedScheduledExecutorSecondaryPoolLifecycleTest — owned secondary is shut down on destroyResource(); borrowed secondary is not. TCK Web profile back to 196/0/0/14.
1 parent 4ee90ed commit dff924c

7 files changed

Lines changed: 322 additions & 7 deletions

File tree

container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ private Object aroundInvokeScheduled(final InvocationContext ctx, final Asynchro
135135
final ContextServiceImpl.Snapshot snapshot = ctxService.snapshot(null);
136136

137137
// Run scheduled firings on the requested executor so the user's thread factory,
138-
// priorities, and virtual-thread settings apply.
139-
final ScheduledExecutorService triggerDelegate = mses.getDelegate();
138+
// priorities, and virtual-thread settings apply. Use the secondary pool so firings
139+
// are not throttled by maxAsync (per Concurrency 3.1 §3.1).
140+
final ScheduledExecutorService triggerDelegate = mses.getScheduledAsyncDelegate();
140141

141142
// A single CompletableFuture represents ALL executions in the schedule.
142143
// Per spec: "A single future represents the completion of all executions in the schedule."
@@ -177,7 +178,10 @@ private ManagedScheduledExecutorServiceImpl resolveMses(final String executorNam
177178
final ContextServiceImpl mesContextService = (ContextServiceImpl) plainMes.getContextService();
178179
final ManagedScheduledExecutorServiceImpl defaultMses =
179180
ManagedScheduledExecutorServiceImplFactory.lookup("java:comp/DefaultManagedScheduledExecutorService");
180-
return new ManagedScheduledExecutorServiceImpl(defaultMses.getDelegate(), mesContextService);
181+
// Borrow the default MSES's secondary pool so this short-lived wrapper does not
182+
// leak a fresh ScheduledThreadPoolExecutor per invocation. ownsScheduledAsyncDelegate=false.
183+
return new ManagedScheduledExecutorServiceImpl(defaultMses.getDelegate(), mesContextService,
184+
defaultMses.getScheduledAsyncDelegate(), false);
181185
} catch (final Exception fallbackEx) {
182186
throw new RejectedExecutionException("Cannot lookup executor for scheduled async method", e);
183187
}

container/openejb-core/src/main/java/org/apache/openejb/resource/thread/ManagedScheduledExecutorServiceImplFactory.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,23 @@ public static ManagedScheduledExecutorServiceImpl lookup(String name) {
8989
}
9090

9191
private int core = 5;
92+
private int scheduledAsyncCore = 5;
9293
private String threadFactory = ManagedThreadFactoryImpl.class.getName();
9394
private boolean virtual;
9495

9596
private String context;
9697

9798
public ManagedScheduledExecutorServiceImpl create(final ContextServiceImpl contextService) {
98-
return new ManagedScheduledExecutorServiceImpl(createScheduledExecutorService(), contextService);
99+
final Pools pools = createScheduledExecutorServicePools();
100+
return new ManagedScheduledExecutorServiceImpl(pools.primary, contextService, pools.secondary, true);
99101
}
100102
public ManagedScheduledExecutorServiceImpl create() {
101-
return new ManagedScheduledExecutorServiceImpl(createScheduledExecutorService(), ContextServiceImplFactory.lookupOrDefault(context));
103+
final Pools pools = createScheduledExecutorServicePools();
104+
return new ManagedScheduledExecutorServiceImpl(pools.primary, ContextServiceImplFactory.lookupOrDefault(context),
105+
pools.secondary, true);
102106
}
103107

104-
private ScheduledExecutorService createScheduledExecutorService() {
108+
private Pools createScheduledExecutorServicePools() {
105109
ManagedThreadFactory managedThreadFactory;
106110
try {
107111
// For the default factory, bypass reflective instantiation so the configured
@@ -114,13 +118,28 @@ private ScheduledExecutorService createScheduledExecutorService() {
114118
managedThreadFactory = new ManagedThreadFactoryImpl(ManagedThreadFactoryImpl.DEFAULT_PREFIX, null, ContextServiceImplFactory.lookupOrDefault(context), virtual);
115119
}
116120

117-
return new ScheduledThreadPoolExecutor(core, managedThreadFactory, CURejectHandler.INSTANCE);
121+
// Primary pool — regular async submissions; corePoolSize follows maxAsync.
122+
final ScheduledExecutorService primary =
123+
new ScheduledThreadPoolExecutor(core, managedThreadFactory, CURejectHandler.INSTANCE);
124+
// Secondary pool — scheduled @Asynchronous firings. Per Concurrency 3.1 §3.1 these
125+
// must NOT be throttled by maxAsync, so size the pool independently while reusing
126+
// the same stateless thread factory (preserves naming / virtual / priority).
127+
final ScheduledExecutorService secondary =
128+
new ScheduledThreadPoolExecutor(scheduledAsyncCore, managedThreadFactory, CURejectHandler.INSTANCE);
129+
return new Pools(primary, secondary);
118130
}
119131

120132
public void setCore(final int core) {
121133
this.core = core;
122134
}
123135

136+
public void setScheduledAsyncCore(final int scheduledAsyncCore) {
137+
this.scheduledAsyncCore = scheduledAsyncCore;
138+
}
139+
140+
private record Pools(ScheduledExecutorService primary, ScheduledExecutorService secondary) {
141+
}
142+
124143
public void setThreadFactory(final String threadFactory) {
125144
this.threadFactory = threadFactory;
126145
}

container/openejb-core/src/main/java/org/apache/openejb/threads/future/CUTriggerScheduledFuture.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ public boolean cancel(boolean mayInterruptIfRunning) {
4444
return super.cancel(mayInterruptIfRunning);
4545
}
4646

47+
@Override
48+
public boolean isCancelled() {
49+
// Also honour the TriggerTask's cancelled flag. The underlying delegate future
50+
// reachable through the facade can point at the just-completed execution if
51+
// cancel() raced ahead of the recursive scheduleNextRun(); in that case cancel()
52+
// set cancelled=true on the TriggerTask but the delegate future reports done
53+
// rather than cancelled. Without this override, isCancelled() would return false
54+
// for a future the caller explicitly cancelled.
55+
return super.isCancelled() || ((TriggerTask<V>) listener).isCancelled();
56+
}
57+
4758
@Override
4859
public V get() throws InterruptedException, ExecutionException {
4960
V result = super.get();

container/openejb-core/src/main/java/org/apache/openejb/threads/impl/ManagedScheduledExecutorServiceImpl.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
import jakarta.enterprise.concurrent.ManagedScheduledExecutorService;
2828
import jakarta.enterprise.concurrent.ManagedTask;
2929
import jakarta.enterprise.concurrent.Trigger;
30+
import org.apache.openejb.util.LogCategory;
31+
import org.apache.openejb.util.Logger;
3032
import java.lang.reflect.InvocationHandler;
3133
import java.lang.reflect.InvocationTargetException;
3234
import java.lang.reflect.Method;
3335
import java.lang.reflect.Proxy;
3436
import java.util.Date;
37+
import java.util.List;
3538
import java.util.Map;
3639
import java.util.Objects;
3740
import java.util.concurrent.Callable;
@@ -42,13 +45,25 @@
4245
import java.util.concurrent.atomic.AtomicReference;
4346

4447
public class ManagedScheduledExecutorServiceImpl extends ManagedExecutorServiceImpl implements ManagedScheduledExecutorService {
48+
private static final Logger LOGGER = Logger.getInstance(LogCategory.OPENEJB, ManagedScheduledExecutorServiceImpl.class);
49+
4550
private final ScheduledExecutorService delegate;
4651
private final ContextServiceImpl contextService;
52+
private final ScheduledExecutorService scheduledAsyncDelegate;
53+
private final boolean ownsScheduledAsyncDelegate;
4754

4855
public ManagedScheduledExecutorServiceImpl(final ScheduledExecutorService delegate, final ContextServiceImpl contextService) {
56+
this(delegate, contextService, delegate, false);
57+
}
58+
59+
public ManagedScheduledExecutorServiceImpl(final ScheduledExecutorService delegate, final ContextServiceImpl contextService,
60+
final ScheduledExecutorService scheduledAsyncDelegate,
61+
final boolean ownsScheduledAsyncDelegate) {
4962
super(delegate, contextService);
5063
this.delegate = delegate;
5164
this.contextService = contextService;
65+
this.scheduledAsyncDelegate = scheduledAsyncDelegate != null ? scheduledAsyncDelegate : delegate;
66+
this.ownsScheduledAsyncDelegate = ownsScheduledAsyncDelegate;
5267
}
5368

5469

@@ -137,6 +152,35 @@ public ScheduledExecutorService getDelegate() {
137152
return delegate;
138153
}
139154

155+
/**
156+
* Secondary scheduling pool used to dispatch {@code @Asynchronous(runAt=@Schedule(...))}
157+
* firings. Per Jakarta Concurrency 3.1 §3.1, scheduled asynchronous methods are not
158+
* subject to {@code max-async}, so firings must not queue behind regular async work
159+
* occupying the primary delegate's core threads.
160+
*/
161+
public ScheduledExecutorService getScheduledAsyncDelegate() {
162+
return scheduledAsyncDelegate;
163+
}
164+
165+
@Override
166+
public void destroyResource() {
167+
if (ownsScheduledAsyncDelegate && scheduledAsyncDelegate != null && scheduledAsyncDelegate != delegate) {
168+
final List<Runnable> leftover = scheduledAsyncDelegate.shutdownNow();
169+
if (!leftover.isEmpty()) {
170+
LOGGER.warning(leftover.size() + " scheduled-async tasks to execute");
171+
for (final Runnable runnable : leftover) {
172+
try {
173+
LOGGER.info("Executing " + runnable);
174+
runnable.run();
175+
} catch (final Throwable th) {
176+
LOGGER.error(th.getMessage(), th);
177+
}
178+
}
179+
}
180+
}
181+
super.destroyResource();
182+
}
183+
140184
/**
141185
* Automatically resolves an AtomicReference
142186
* @param delegate

container/openejb-core/src/main/java/org/apache/openejb/threads/task/TriggerTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ public void cancelScheduling() {
142142
}
143143
}
144144

145+
public boolean isCancelled() {
146+
return cancelled;
147+
}
148+
145149
private static class LastExecutionImpl implements LastExecution {
146150
private final String identityName;
147151
private final Object result;
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.openejb.cdi.concurrency;
18+
19+
import jakarta.enterprise.concurrent.Asynchronous;
20+
import jakarta.enterprise.concurrent.ManageableThread;
21+
import jakarta.enterprise.concurrent.ManagedScheduledExecutorDefinition;
22+
import jakarta.enterprise.concurrent.ManagedScheduledExecutorService;
23+
import jakarta.enterprise.concurrent.Schedule;
24+
import jakarta.enterprise.context.ApplicationScoped;
25+
import jakarta.inject.Inject;
26+
import org.apache.openejb.jee.EnterpriseBean;
27+
import org.apache.openejb.jee.SingletonBean;
28+
import org.apache.openejb.junit.ApplicationComposer;
29+
import org.apache.openejb.testing.Module;
30+
import org.apache.openejb.threads.impl.ManagedThreadFactoryImpl;
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
34+
import javax.naming.InitialContext;
35+
import java.util.HashSet;
36+
import java.util.Set;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.Future;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
44+
import static org.junit.Assert.assertFalse;
45+
import static org.junit.Assert.assertNotNull;
46+
import static org.junit.Assert.assertTrue;
47+
48+
/**
49+
* Verifies that scheduled {@code @Asynchronous} firings honor the requested
50+
* {@link ManagedScheduledExecutorService}'s thread factory: the firing thread
51+
* is produced by the same {@link ManagedThreadFactoryImpl} as the primary pool
52+
* (same naming prefix, {@link ManageableThread} shape), not a stranger pool such
53+
* as the default MSES. Locks down clause A of Concurrency 3.1 §3.1 (firings run
54+
* with the requested executor's thread factory / virtual / priority).
55+
*/
56+
@RunWith(ApplicationComposer.class)
57+
public class ScheduledAsyncCustomFactoryTest {
58+
59+
private static final String MSES_JNDI = "java:module/custom/factoryMSES";
60+
61+
@Inject
62+
private FactoryBean bean;
63+
64+
@Module
65+
public EnterpriseBean ejb() {
66+
return new SingletonBean(DummyEjb.class).localBean();
67+
}
68+
69+
@Module
70+
public Class<?>[] beans() {
71+
return new Class<?>[]{FactoryBean.class};
72+
}
73+
74+
@Test
75+
public void scheduledFiringUsesCustomExecutorsThreadFactory() throws Exception {
76+
final ManagedScheduledExecutorService custom = InitialContext.doLookup(MSES_JNDI);
77+
78+
// Collect thread ids from the custom MSES's primary pool by submitting short-lived
79+
// probes that each block until a release latch fires. Capturing several probes
80+
// forces the primary pool to spawn each of its worker threads concurrently.
81+
final int primarySamples = 3;
82+
final Set<Long> primaryThreadIds = ConcurrentHashMap.newKeySet();
83+
final CountDownLatch primaryStarted = new CountDownLatch(primarySamples);
84+
final CountDownLatch release = new CountDownLatch(1);
85+
final Set<Future<?>> primaryProbes = new HashSet<>();
86+
for (int i = 0; i < primarySamples; i++) {
87+
primaryProbes.add(custom.submit(() -> {
88+
primaryThreadIds.add(Thread.currentThread().getId());
89+
primaryStarted.countDown();
90+
try {
91+
release.await(5, TimeUnit.SECONDS);
92+
} catch (final InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
}
95+
}));
96+
}
97+
assertTrue("Primary-pool probes should start", primaryStarted.await(5, TimeUnit.SECONDS));
98+
release.countDown();
99+
for (final Future<?> f : primaryProbes) {
100+
f.get(5, TimeUnit.SECONDS);
101+
}
102+
assertFalse("Primary pool should have reported at least one worker thread id", primaryThreadIds.isEmpty());
103+
104+
final AtomicInteger counter = new AtomicInteger();
105+
final CompletableFuture<ThreadSnapshot> future = bean.capture(counter);
106+
final ThreadSnapshot firing = future.get(15, TimeUnit.SECONDS);
107+
108+
assertNotNull("Scheduled firing must complete", firing);
109+
assertFalse("Firing must run on the secondary pool, not the primary — the custom executor's "
110+
+ "primary core-size is capped by maxAsync (firing thread id=" + firing.threadId
111+
+ ", primary ids=" + primaryThreadIds + ")",
112+
primaryThreadIds.contains(firing.threadId));
113+
assertTrue("Firing thread must come from the custom MSES's ManagedThreadFactory "
114+
+ "(expected name prefix '" + ManagedThreadFactoryImpl.DEFAULT_PREFIX
115+
+ "', got '" + firing.threadName + "')",
116+
firing.threadName.startsWith(ManagedThreadFactoryImpl.DEFAULT_PREFIX));
117+
assertTrue("Firing thread must be a ManageableThread produced by ManagedThreadFactoryImpl "
118+
+ "(got class " + firing.threadClass + ")",
119+
firing.manageable);
120+
}
121+
122+
@ManagedScheduledExecutorDefinition(name = MSES_JNDI, maxAsync = 3)
123+
@ApplicationScoped
124+
public static class FactoryBean {
125+
126+
@Asynchronous(executor = MSES_JNDI, runAt = @Schedule(cron = "* * * * * *"))
127+
public CompletableFuture<ThreadSnapshot> capture(final AtomicInteger counter) {
128+
counter.incrementAndGet();
129+
final Thread t = Thread.currentThread();
130+
final ThreadSnapshot snap = new ThreadSnapshot(
131+
t.getId(),
132+
t.getName(),
133+
t.getClass().getName(),
134+
t instanceof ManageableThread);
135+
final CompletableFuture<ThreadSnapshot> future = Asynchronous.Result.getFuture();
136+
future.complete(snap);
137+
return future;
138+
}
139+
}
140+
141+
public record ThreadSnapshot(long threadId, String threadName, String threadClass, boolean manageable) {
142+
}
143+
144+
@jakarta.ejb.Singleton
145+
public static class DummyEjb {
146+
}
147+
}

0 commit comments

Comments
 (0)