From 806929029e94f78a6b139b7fecf8f933def8c09a Mon Sep 17 00:00:00 2001 From: kzurawski Date: Mon, 11 May 2026 16:14:50 -0400 Subject: [PATCH 1/3] feat-sqs-heartbeat Initial code to try and get sqs to heartbeat so items don't go back to the queue when we are still processin ghtem --- .../cloud/sqs/annotation/SqsListener.java | 10 + ...qsListenerAnnotationBeanPostProcessor.java | 5 + .../cloud/sqs/config/SqsEndpoint.java | 40 +++ .../SqsMessageListenerContainerFactory.java | 4 + .../sqs/listener/FifoSqsComponentFactory.java | 15 ++ .../sqs/listener/SqsContainerOptions.java | 56 +++++ .../listener/SqsContainerOptionsBuilder.java | 14 ++ .../listener/StandardSqsComponentFactory.java | 18 +- ...MessageVisibilityHeartbeatSinkAdapter.java | 234 ++++++++++++++++++ ...tenerAnnotationBeanPostProcessorTests.java | 36 +++ ...sMessageListenerContainerFactoryTests.java | 16 ++ .../sqs/listener/ContainerOptionsTests.java | 3 +- .../FifoSqsComponentFactoryTests.java | 14 ++ .../StandardSqsComponentFactoryTests.java | 21 ++ 14 files changed, 484 insertions(+), 2 deletions(-) create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java index eb700ad1a8..aed6e8ece3 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java @@ -140,6 +140,16 @@ */ String messageVisibilitySeconds() default ""; + /** + * Interval between visibility heartbeat requests sent while listener execution is in progress. + */ + String messageVisibilityHeartbeatIntervalSeconds() default ""; + + /** + * Visibility timeout to apply on each visibility heartbeat request. + */ + String messageVisibilityHeartbeatSeconds() default ""; + /** * The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined * for the container factory will be used. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java index 0fdc53876b..8024768c38 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java @@ -68,6 +68,11 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) { resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages")) .messageVisibility( resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility")) + .messageVisibilityHeartbeatInterval( + resolveAsInteger(sqsListenerAnnotation.messageVisibilityHeartbeatIntervalSeconds(), + "messageVisibilityHeartbeatInterval")) + .messageVisibilityHeartbeat(resolveAsInteger(sqsListenerAnnotation.messageVisibilityHeartbeatSeconds(), + "messageVisibilityHeartbeat")) .acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build(); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java index 6097252009..490981fa16 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java @@ -38,6 +38,10 @@ public class SqsEndpoint extends AbstractEndpoint { private final Integer messageVisibility; + private final Integer messageVisibilityHeartbeatInterval; + + private final Integer messageVisibilityHeartbeat; + private final Integer maxMessagesPerPoll; @Nullable @@ -48,6 +52,8 @@ protected SqsEndpoint(SqsEndpointBuilder builder) { this.maxConcurrentMessages = builder.maxConcurrentMessages; this.pollTimeoutSeconds = builder.pollTimeoutSeconds; this.messageVisibility = builder.messageVisibility; + this.messageVisibilityHeartbeatInterval = builder.messageVisibilityHeartbeatInterval; + this.messageVisibilityHeartbeat = builder.messageVisibilityHeartbeat; this.maxMessagesPerPoll = builder.maxMessagesPerPoll; this.acknowledgementMode = builder.acknowledgementMode; } @@ -97,6 +103,26 @@ public Duration getMessageVisibility() { return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null; } + /** + * Return the visibility heartbeat interval for this endpoint. + * @return the visibility heartbeat interval. + */ + @Nullable + public Duration getMessageVisibilityHeartbeatInterval() { + return this.messageVisibilityHeartbeatInterval != null + ? Duration.ofSeconds(this.messageVisibilityHeartbeatInterval) + : null; + } + + /** + * Return the visibility timeout to apply on each heartbeat request for this endpoint. + * @return the visibility heartbeat timeout. + */ + @Nullable + public Duration getMessageVisibilityHeartbeat() { + return this.messageVisibilityHeartbeat != null ? Duration.ofSeconds(this.messageVisibilityHeartbeat) : null; + } + /** * Returns the acknowledgement mode configured for this endpoint. * @return the acknowledgement mode. @@ -118,6 +144,10 @@ public static class SqsEndpointBuilder { private Integer messageVisibility; + private Integer messageVisibilityHeartbeatInterval; + + private Integer messageVisibilityHeartbeat; + private String id; private Integer maxMessagesPerPoll; @@ -155,6 +185,16 @@ public SqsEndpointBuilder messageVisibility(Integer messageVisibility) { return this; } + public SqsEndpointBuilder messageVisibilityHeartbeatInterval(Integer messageVisibilityHeartbeatInterval) { + this.messageVisibilityHeartbeatInterval = messageVisibilityHeartbeatInterval; + return this; + } + + public SqsEndpointBuilder messageVisibilityHeartbeat(Integer messageVisibilityHeartbeat) { + this.messageVisibilityHeartbeat = messageVisibilityHeartbeat; + return this; + } + public SqsEndpointBuilder id(String id) { this.id = id; return this; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java index a72e9e1510..51769774c2 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java @@ -173,6 +173,10 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio .acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll) .acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout) .acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility) + .acceptIfNotNull(sqsEndpoint.getMessageVisibilityHeartbeatInterval(), + options::messageVisibilityHeartbeatInterval) + .acceptIfNotNull(sqsEndpoint.getMessageVisibilityHeartbeat(), + options::messageVisibilityHeartbeatTimeout) .acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java index 5ca3a6951e..61d1e5fa00 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java @@ -27,6 +27,7 @@ import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink; import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter; import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter; +import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter; import io.awspring.cloud.sqs.listener.source.FifoSqsMessageSource; import io.awspring.cloud.sqs.listener.source.MessageSource; import java.time.Duration; @@ -72,6 +73,8 @@ public MessageSink createMessageSink(SqsContainerOptions options) { MessageSink deliverySink = createDeliverySink(options.getListenerMode()); MessageSink wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink, options.getMessageVisibility()); + wrappedDeliverySink = maybeWrapWithVisibilityHeartbeatAdapter(wrappedDeliverySink, + options.getMessageVisibilityHeartbeatInterval(), options.getMessageVisibilityHeartbeatTimeout()); return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink); } @@ -104,6 +107,18 @@ private MessageVisibilityExtendingSinkAdapter addMessageVisibilityExtendingSi return visibilityAdapter; } + private MessageSink maybeWrapWithVisibilityHeartbeatAdapter(MessageSink deliverySink, + @Nullable Duration heartbeatInterval, @Nullable Duration heartbeatTimeout) { + if (heartbeatInterval == null || heartbeatTimeout == null) { + return deliverySink; + } + MessageVisibilityHeartbeatSinkAdapter heartbeatAdapter = new MessageVisibilityHeartbeatSinkAdapter<>( + deliverySink); + heartbeatAdapter.setHeartbeatInterval(heartbeatInterval); + heartbeatAdapter.setHeartbeatVisibilityTimeout(heartbeatTimeout); + return heartbeatAdapter; + } + private Function, String> getMessageGroupingFunction() { return message -> MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java index 3a923e2028..e816cc9721 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java @@ -40,6 +40,12 @@ public class SqsContainerOptions extends AbstractContainerOptions queueAttributeNames; @@ -62,6 +68,8 @@ private SqsContainerOptions(BuilderImpl builder) { this.messageAttributeNames = builder.messageAttributeNames; this.messageSystemAttributeNames = builder.messageSystemAttributeNames; this.messageVisibility = builder.messageVisibility; + this.messageVisibilityHeartbeatInterval = builder.messageVisibilityHeartbeatInterval; + this.messageVisibilityHeartbeatTimeout = builder.messageVisibilityHeartbeatTimeout; this.queueNotFoundStrategy = builder.queueNotFoundStrategy; this.fifoBatchGroupingStrategy = builder.fifoBatchGroupingStrategy; this.convertMessageIdToUuid = builder.convertMessageIdToUuid; @@ -108,6 +116,24 @@ public Duration getMessageVisibility() { return this.messageVisibility; } + /** + * Get the interval between visibility heartbeat requests sent while a message is being processed. + * @return the heartbeat interval. + */ + @Nullable + public Duration getMessageVisibilityHeartbeatInterval() { + return this.messageVisibilityHeartbeatInterval; + } + + /** + * Get the visibility timeout to apply on each visibility heartbeat request. + * @return the heartbeat timeout. + */ + @Nullable + public Duration getMessageVisibilityHeartbeatTimeout() { + return this.messageVisibilityHeartbeatTimeout; + } + /** * Get messages grouping strategy in FIFO queues when retrieved by the container in listener mode * {@link ListenerMode#BATCH}. @@ -165,6 +191,12 @@ protected static class BuilderImpl @Nullable private Duration messageVisibility; + @Nullable + private Duration messageVisibilityHeartbeatInterval; + + @Nullable + private Duration messageVisibilityHeartbeatTimeout; + private boolean convertMessageIdToUuid = true; protected BuilderImpl() { @@ -177,6 +209,8 @@ protected BuilderImpl(SqsContainerOptions options) { this.messageAttributeNames = options.messageAttributeNames; this.messageSystemAttributeNames = options.messageSystemAttributeNames; this.messageVisibility = options.messageVisibility; + this.messageVisibilityHeartbeatInterval = options.messageVisibilityHeartbeatInterval; + this.messageVisibilityHeartbeatTimeout = options.messageVisibilityHeartbeatTimeout; this.fifoBatchGroupingStrategy = options.fifoBatchGroupingStrategy; this.queueNotFoundStrategy = options.queueNotFoundStrategy; this.convertMessageIdToUuid = options.convertMessageIdToUuid; @@ -212,6 +246,28 @@ public SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility) return this; } + @Override + public SqsContainerOptionsBuilder messageVisibilityHeartbeatInterval( + Duration messageVisibilityHeartbeatInterval) { + Assert.notNull(messageVisibilityHeartbeatInterval, "messageVisibilityHeartbeatInterval cannot be null"); + Assert.isTrue( + !messageVisibilityHeartbeatInterval.isNegative() && !messageVisibilityHeartbeatInterval.isZero(), + "messageVisibilityHeartbeatInterval must be greater than zero"); + this.messageVisibilityHeartbeatInterval = messageVisibilityHeartbeatInterval; + return this; + } + + @Override + public SqsContainerOptionsBuilder messageVisibilityHeartbeatTimeout( + Duration messageVisibilityHeartbeatTimeout) { + Assert.notNull(messageVisibilityHeartbeatTimeout, "messageVisibilityHeartbeatTimeout cannot be null"); + Assert.isTrue( + !messageVisibilityHeartbeatTimeout.isNegative() && !messageVisibilityHeartbeatTimeout.isZero(), + "messageVisibilityHeartbeatTimeout must be greater than zero"); + this.messageVisibilityHeartbeatTimeout = messageVisibilityHeartbeatTimeout; + return this; + } + @Override public SqsContainerOptionsBuilder fifoBatchGroupingStrategy( FifoBatchGroupingStrategy fifoBatchGroupingStrategy) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java index 704da2d95e..4e7caffd0a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptionsBuilder.java @@ -60,6 +60,20 @@ SqsContainerOptionsBuilder messageSystemAttributeNames( */ SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility); + /** + * Set the interval between visibility heartbeat requests sent while a message is being processed. + * @param messageVisibilityHeartbeatInterval the heartbeat interval. + * @return this instance. + */ + SqsContainerOptionsBuilder messageVisibilityHeartbeatInterval(Duration messageVisibilityHeartbeatInterval); + + /** + * Set the visibility timeout to apply on each visibility heartbeat request. + * @param messageVisibilityHeartbeatTimeout the timeout to set on each heartbeat. + * @return this instance. + */ + SqsContainerOptionsBuilder messageVisibilityHeartbeatTimeout(Duration messageVisibilityHeartbeatTimeout); + /** * Set how the messages from FIFO queues should be grouped when container listener mode is * {@link ListenerMode#BATCH}. By default, messages are grouped in batches by message group, which are processed in diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java index cba188c708..3a85a1a24b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java @@ -24,10 +24,12 @@ import io.awspring.cloud.sqs.listener.sink.BatchMessageSink; import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink; import io.awspring.cloud.sqs.listener.sink.MessageSink; +import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter; import io.awspring.cloud.sqs.listener.source.MessageSource; import io.awspring.cloud.sqs.listener.source.StandardSqsMessageSource; import java.time.Duration; import java.util.Collection; +import org.jspecify.annotations.Nullable; import org.springframework.util.Assert; /** @@ -57,12 +59,26 @@ public MessageSource createMessageSource(SqsContainerOptions options) { // @formatter:off @Override public MessageSink createMessageSink(SqsContainerOptions options) { - return ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode()) + MessageSink messageSink = ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode()) ? new FanOutMessageSink<>() : new BatchMessageSink<>(); + return maybeWrapWithVisibilityHeartbeatAdapter(messageSink, + options.getMessageVisibilityHeartbeatInterval(), options.getMessageVisibilityHeartbeatTimeout()); } // @formatter:on + private MessageSink maybeWrapWithVisibilityHeartbeatAdapter(MessageSink messageSink, + @Nullable Duration heartbeatInterval, @Nullable Duration heartbeatTimeout) { + if (heartbeatInterval == null || heartbeatTimeout == null) { + return messageSink; + } + MessageVisibilityHeartbeatSinkAdapter heartbeatSinkAdapter = new MessageVisibilityHeartbeatSinkAdapter<>( + messageSink); + heartbeatSinkAdapter.setHeartbeatInterval(heartbeatInterval); + heartbeatSinkAdapter.setHeartbeatVisibilityTimeout(heartbeatTimeout); + return heartbeatSinkAdapter; + } + @Override public AcknowledgementProcessor createAcknowledgementProcessor(SqsContainerOptions options) { validateAcknowledgementOrdering(options); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java new file mode 100644 index 0000000000..53008f6fd9 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java @@ -0,0 +1,234 @@ +/* + * Copyright 2013-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener.sink.adapter; + +import io.awspring.cloud.sqs.MessageHeaderUtils; +import io.awspring.cloud.sqs.listener.BatchVisibility; +import io.awspring.cloud.sqs.listener.MessageProcessingContext; +import io.awspring.cloud.sqs.listener.QueueMessageVisibility; +import io.awspring.cloud.sqs.listener.SqsHeaders; +import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; +import io.awspring.cloud.sqs.listener.sink.MessageSink; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.util.Assert; + +/** + * A {@link AbstractDelegatingMessageListeningSinkAdapter} that adds an interceptor responsible for periodically + * extending visibility while a message (or message batch) is being processed. + * + * @author Tomaz Fernandes + * @since 3.5 + */ +public class MessageVisibilityHeartbeatSinkAdapter extends AbstractDelegatingMessageListeningSinkAdapter { + + private static final Logger logger = LoggerFactory.getLogger(MessageVisibilityHeartbeatSinkAdapter.class); + + private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofSeconds(10); + + private static final Duration DEFAULT_HEARTBEAT_VISIBILITY_TIMEOUT = Duration.ofSeconds(30); + + private static final String THREAD_NAME_PREFIX = "sqs-message-visibility-heartbeat-"; + + private Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + + private int heartbeatVisibilityTimeoutSeconds = (int) DEFAULT_HEARTBEAT_VISIBILITY_TIMEOUT.getSeconds(); + + private volatile boolean running; + + private final Object lifecycleMonitor = new Object(); + + @Nullable + private ScheduledExecutorService scheduler; + + public MessageVisibilityHeartbeatSinkAdapter(MessageSink delegate) { + super(delegate); + } + + public void setHeartbeatInterval(Duration heartbeatInterval) { + Assert.notNull(heartbeatInterval, "heartbeatInterval cannot be null"); + Assert.isTrue(!heartbeatInterval.isNegative() && !heartbeatInterval.isZero(), + "heartbeatInterval must be greater than zero"); + this.heartbeatInterval = heartbeatInterval; + } + + public void setHeartbeatVisibilityTimeout(Duration heartbeatVisibilityTimeout) { + Assert.notNull(heartbeatVisibilityTimeout, "heartbeatVisibilityTimeout cannot be null"); + Assert.isTrue(!heartbeatVisibilityTimeout.isNegative() && !heartbeatVisibilityTimeout.isZero(), + "heartbeatVisibilityTimeout must be greater than zero"); + this.heartbeatVisibilityTimeoutSeconds = (int) heartbeatVisibilityTimeout.getSeconds(); + } + + @Override + public CompletableFuture emit(Collection> messages, MessageProcessingContext context) { + logger.trace("Adding visibility heartbeat interceptor for messages {}", MessageHeaderUtils.getId(messages)); + return getDelegate().emit(messages, + context.addInterceptor(new MessageVisibilityHeartbeatInterceptor(messages))); + } + + @Override + public void start() { + if (isRunning()) { + return; + } + synchronized (this.lifecycleMonitor) { + if (isRunning()) { + return; + } + this.scheduler = createScheduler(); + super.start(); + this.running = true; + } + } + + @Override + public void stop() { + if (!isRunning()) { + return; + } + synchronized (this.lifecycleMonitor) { + if (!isRunning()) { + return; + } + this.running = false; + ScheduledExecutorService localScheduler = this.scheduler; + this.scheduler = null; + if (localScheduler != null) { + localScheduler.shutdownNow(); + } + super.stop(); + } + } + + @Override + public boolean isRunning() { + return this.running && super.isRunning(); + } + + protected ScheduledExecutorService createScheduler() { + return Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory(THREAD_NAME_PREFIX)); + } + + private class MessageVisibilityHeartbeatInterceptor implements AsyncMessageInterceptor { + + private final Collection> originalMessages; + + private final Map> runningMessageHeartbeats = new ConcurrentHashMap<>(); + + @Nullable + private ScheduledFuture runningBatchHeartbeat; + + MessageVisibilityHeartbeatInterceptor(Collection> originalMessages) { + this.originalMessages = Collections.unmodifiableCollection(new ArrayList<>(originalMessages)); + } + + @Override + public CompletableFuture> intercept(Message message) { + String messageId = MessageHeaderUtils.getId(message); + this.runningMessageHeartbeats.computeIfAbsent(messageId, + id -> scheduleHeartbeat(() -> getMessageVisibility(message).changeToAsync( + MessageVisibilityHeartbeatSinkAdapter.this.heartbeatVisibilityTimeoutSeconds), id)); + return CompletableFuture.completedFuture(message); + } + + @Override + public CompletableFuture>> intercept(Collection> messages) { + if (this.runningBatchHeartbeat == null) { + this.runningBatchHeartbeat = scheduleHeartbeat( + () -> getBatchVisibility(messages).changeToAsync( + MessageVisibilityHeartbeatSinkAdapter.this.heartbeatVisibilityTimeoutSeconds), + MessageHeaderUtils.getId(messages)); + } + return CompletableFuture.completedFuture(messages); + } + + @Override + public CompletableFuture afterProcessing(Message message, @Nullable Throwable t) { + String messageId = MessageHeaderUtils.getId(message); + cancelHeartbeat(this.runningMessageHeartbeats.remove(messageId), messageId); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture afterProcessing(Collection> messages, @Nullable Throwable t) { + cancelHeartbeat(this.runningBatchHeartbeat, MessageHeaderUtils.getId(messages)); + this.runningBatchHeartbeat = null; + this.originalMessages.forEach(msg -> { + String id = MessageHeaderUtils.getId(msg); + cancelHeartbeat(this.runningMessageHeartbeats.remove(id), id); + }); + return CompletableFuture.completedFuture(null); + } + + private ScheduledFuture scheduleHeartbeat(Supplier> heartbeatAction, String id) { + ScheduledExecutorService localScheduler = MessageVisibilityHeartbeatSinkAdapter.this.scheduler; + Assert.state(localScheduler != null, "heartbeat scheduler not initialized"); + logger.trace("Scheduling visibility heartbeat for {} every {}ms", id, + MessageVisibilityHeartbeatSinkAdapter.this.heartbeatInterval.toMillis()); + return localScheduler.scheduleAtFixedRate(() -> { + try { + heartbeatAction.get().whenComplete((v, t) -> { + if (t != null) { + logger.warn("Error sending visibility heartbeat for {}", id, t); + } + else { + logger.trace("Visibility heartbeat sent for {}", id); + } + }); + } + catch (Exception ex) { + logger.warn("Error preparing visibility heartbeat for {}", id, ex); + } + }, MessageVisibilityHeartbeatSinkAdapter.this.heartbeatInterval.toMillis(), + MessageVisibilityHeartbeatSinkAdapter.this.heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + private void cancelHeartbeat(@Nullable ScheduledFuture future, String id) { + if (future != null) { + future.cancel(true); + logger.trace("Cancelled visibility heartbeat for {}", id); + } + } + + private QueueMessageVisibility getMessageVisibility(Message message) { + return MessageHeaderUtils.getHeader(message, SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, + QueueMessageVisibility.class); + } + + @SuppressWarnings("unchecked") + private BatchVisibility getBatchVisibility(Collection> messages) { + QueueMessageVisibility visibility = getMessageVisibility(messages.iterator().next()); + return visibility.toBatchVisibility((Collection>) (Collection) messages); + } + + } + +} diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessorTests.java index d1dbb091fe..4c0a17dc00 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessorTests.java @@ -31,6 +31,7 @@ import io.awspring.cloud.sqs.listener.MessageListenerContainerRegistry; import io.awspring.cloud.sqs.support.converter.legacy.LegacyJackson2MessageConverterMigration; import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -258,6 +259,33 @@ protected EndpointRegistrar createEndpointRegistrar() { }); } + @Test + void shouldMapVisibilityHeartbeatFromAnnotationToEndpoint() { + ListableBeanFactory beanFactory = mock(ListableBeanFactory.class); + MessageListenerContainerRegistry registry = mock(MessageListenerContainerRegistry.class); + MessageListenerContainerFactory factory = mock(MessageListenerContainerFactory.class); + + when(beanFactory.getBean(SqsBeanNames.ENDPOINT_REGISTRY_BEAN_NAME, MessageListenerContainerRegistry.class)) + .thenReturn(registry); + when(beanFactory.containsBean(EndpointRegistrar.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)).thenReturn(true); + when(beanFactory.getBean(EndpointRegistrar.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME, + MessageListenerContainerFactory.class)).thenReturn(factory); + + SqsListenerAnnotationBeanPostProcessor processor = new SqsListenerAnnotationBeanPostProcessor(); + + HeartbeatListener bean = new HeartbeatListener(); + processor.setBeanFactory(beanFactory); + processor.postProcessAfterInitialization(bean, "heartbeatListener"); + processor.afterSingletonsInstantiated(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Endpoint.class); + then(factory).should().createContainer(captor.capture()); + assertThat(captor.getValue()).isInstanceOfSatisfying(SqsEndpoint.class, endpoint -> { + assertThat(endpoint.getMessageVisibilityHeartbeatInterval()).isEqualTo(Duration.ofSeconds(3)); + assertThat(endpoint.getMessageVisibilityHeartbeat()).isEqualTo(Duration.ofSeconds(20)); + }); + } + static class Listener { @SqsListener("myQueue") @@ -293,6 +321,14 @@ void listen(String message) { } + static class HeartbeatListener { + + @SqsListener(queueNames = "heartbeatQueue", messageVisibilityHeartbeatIntervalSeconds = "3", messageVisibilityHeartbeatSeconds = "20") + void listen(String message) { + } + + } + static class SqsQueueNameReader { static final String QUEUE_NAME_1 = "queueName1"; diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactoryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactoryTests.java index ebc42f2529..c02bf52f4e 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactoryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactoryTests.java @@ -56,6 +56,8 @@ void shouldCreateContainerFromEndpointWithOptionsDefaults() { SqsEndpoint endpoint = mock(SqsEndpoint.class); given(endpoint.getMaxConcurrentMessages()).willReturn(null); given(endpoint.getMessageVisibility()).willReturn(null); + given(endpoint.getMessageVisibilityHeartbeatInterval()).willReturn(null); + given(endpoint.getMessageVisibilityHeartbeat()).willReturn(null); given(endpoint.getMaxMessagesPerPoll()).willReturn(null); given(endpoint.getPollTimeout()).willReturn(null); given(endpoint.getLogicalNames()).willReturn(queueNames); @@ -87,8 +89,12 @@ void shouldCreateContainerFromEndpointOverridingOptions() { int messagesPerPoll = 7; Duration pollTimeout = Duration.ofSeconds(6); Duration visibility = Duration.ofSeconds(8); + Duration heartbeatInterval = Duration.ofSeconds(2); + Duration heartbeatTimeout = Duration.ofSeconds(12); given(endpoint.getMaxConcurrentMessages()).willReturn(inflight); given(endpoint.getMessageVisibility()).willReturn(visibility); + given(endpoint.getMessageVisibilityHeartbeatInterval()).willReturn(heartbeatInterval); + given(endpoint.getMessageVisibilityHeartbeat()).willReturn(heartbeatTimeout); given(endpoint.getMaxMessagesPerPoll()).willReturn(messagesPerPoll); given(endpoint.getPollTimeout()).willReturn(pollTimeout); given(endpoint.getLogicalNames()).willReturn(queueNames); @@ -101,6 +107,8 @@ void shouldCreateContainerFromEndpointOverridingOptions() { assertThat(container.getContainerOptions()).isInstanceOfSatisfying(SqsContainerOptions.class, options -> { assertThat(options.getMaxConcurrentMessages()).isEqualTo(inflight); assertThat(options.getMessageVisibility()).isEqualTo(visibility); + assertThat(options.getMessageVisibilityHeartbeatInterval()).isEqualTo(heartbeatInterval); + assertThat(options.getMessageVisibilityHeartbeatTimeout()).isEqualTo(heartbeatTimeout); assertThat(options.getPollTimeout()).isEqualTo(pollTimeout); assertThat(options.getMaxMessagesPerPoll()).isEqualTo(messagesPerPoll); }); @@ -171,6 +179,8 @@ void shouldCreateContainerFromEndpointWithMultipleMethodsWithDefaultOptions() { given(sqsEndpoint.getMaxConcurrentMessages()).willReturn(null); given(sqsEndpoint.getMessageVisibility()).willReturn(null); + given(sqsEndpoint.getMessageVisibilityHeartbeatInterval()).willReturn(null); + given(sqsEndpoint.getMessageVisibilityHeartbeat()).willReturn(null); given(sqsEndpoint.getMaxMessagesPerPoll()).willReturn(null); given(sqsEndpoint.getPollTimeout()).willReturn(null); given(multiMethodSqsEndpoint.getLogicalNames()).willReturn(queueNames); @@ -203,8 +213,12 @@ void shouldCreateContainerFromMultiMethodEndpointOverridingOptions() { int messagesPerPoll = 7; Duration pollTimeout = Duration.ofSeconds(6); Duration visibility = Duration.ofSeconds(8); + Duration heartbeatInterval = Duration.ofSeconds(2); + Duration heartbeatTimeout = Duration.ofSeconds(12); given(sqsEndpoint.getMaxConcurrentMessages()).willReturn(inflight); given(sqsEndpoint.getMessageVisibility()).willReturn(visibility); + given(sqsEndpoint.getMessageVisibilityHeartbeatInterval()).willReturn(heartbeatInterval); + given(sqsEndpoint.getMessageVisibilityHeartbeat()).willReturn(heartbeatTimeout); given(sqsEndpoint.getMaxMessagesPerPoll()).willReturn(messagesPerPoll); given(sqsEndpoint.getPollTimeout()).willReturn(pollTimeout); given(multiMethodSqsEndpoint.getLogicalNames()).willReturn(queueNames); @@ -218,6 +232,8 @@ void shouldCreateContainerFromMultiMethodEndpointOverridingOptions() { assertThat(container.getContainerOptions()).isInstanceOfSatisfying(SqsContainerOptions.class, options -> { assertThat(options.getMaxConcurrentMessages()).isEqualTo(inflight); assertThat(options.getMessageVisibility()).isEqualTo(visibility); + assertThat(options.getMessageVisibilityHeartbeatInterval()).isEqualTo(heartbeatInterval); + assertThat(options.getMessageVisibilityHeartbeatTimeout()).isEqualTo(heartbeatTimeout); assertThat(options.getPollTimeout()).isEqualTo(pollTimeout); assertThat(options.getMaxMessagesPerPoll()).isEqualTo(messagesPerPoll); }); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java index 8d8f9421f7..d45c27e6b9 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ContainerOptionsTests.java @@ -178,7 +178,8 @@ private SqsContainerOptionsBuilder createConfiguredBuilder() { SqsListenerObservation.Convention observationConvention = mock(SqsListenerObservation.Convention.class); return SqsContainerOptions.builder().acknowledgementShutdownTimeout(Duration.ofSeconds(7)) - .messageVisibility(Duration.ofSeconds(11)) + .messageVisibility(Duration.ofSeconds(11)).messageVisibilityHeartbeatInterval(Duration.ofSeconds(2)) + .messageVisibilityHeartbeatTimeout(Duration.ofSeconds(30)) .pollBackOffPolicy(BackOffPolicyBuilder.newBuilder().delay(1000).build()) .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) .messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.MESSAGE_GROUP_ID)) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactoryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactoryTests.java index 70309ad30e..2786f0328a 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactoryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactoryTests.java @@ -27,6 +27,7 @@ import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink; import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter; import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter; +import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter; import java.time.Duration; import org.assertj.core.api.AbstractObjectAssert; import org.junit.jupiter.api.Test; @@ -70,6 +71,19 @@ void shouldCreateGroupingSinkWithVisibility() { visibilitySinkAssertion.extracting("delegate").isInstanceOf(OrderedMessageSink.class); } + @Test + void shouldCreateGroupingSinkWithVisibilityHeartbeat() { + FifoSqsComponentFactory componentFactory = new FifoSqsComponentFactory<>(); + MessageSink messageSink = componentFactory.createMessageSink(SqsContainerOptions.builder() + .messageVisibility(Duration.ofSeconds(1)).messageVisibilityHeartbeatInterval(Duration.ofSeconds(1)) + .messageVisibilityHeartbeatTimeout(Duration.ofSeconds(15)).build()); + assertThat(messageSink).isInstanceOf(MessageGroupingSinkAdapter.class) + .asInstanceOf(type(MessageGroupingSinkAdapter.class)).extracting("delegate") + .isInstanceOf(MessageVisibilityHeartbeatSinkAdapter.class).extracting("delegate") + .isInstanceOf(MessageVisibilityExtendingSinkAdapter.class).extracting("delegate") + .isInstanceOf(OrderedMessageSink.class); + } + @Test void shouldCreateAcknowledgementProcessorWithDefaults() { FifoSqsComponentFactory componentFactory = new FifoSqsComponentFactory<>(); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactoryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactoryTests.java index 0c73c7c006..ac603ce965 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactoryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactoryTests.java @@ -25,6 +25,7 @@ import io.awspring.cloud.sqs.listener.sink.BatchMessageSink; import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink; import io.awspring.cloud.sqs.listener.sink.MessageSink; +import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter; import java.time.Duration; import org.junit.jupiter.api.Test; @@ -50,6 +51,26 @@ void shouldCreateBatchSink() { assertThat(messageSink).isInstanceOf(BatchMessageSink.class); } + @Test + void shouldCreateSingleMessageSinkWithVisibilityHeartbeat() { + ContainerComponentFactory componentFactory = new StandardSqsComponentFactory<>(); + MessageSink messageSink = componentFactory.createMessageSink( + SqsContainerOptions.builder().messageVisibilityHeartbeatInterval(Duration.ofSeconds(1)) + .messageVisibilityHeartbeatTimeout(Duration.ofSeconds(15)).build()); + assertThat(messageSink).isInstanceOf(MessageVisibilityHeartbeatSinkAdapter.class).extracting("delegate") + .isInstanceOf(FanOutMessageSink.class); + } + + @Test + void shouldCreateBatchSinkWithVisibilityHeartbeat() { + ContainerComponentFactory componentFactory = new StandardSqsComponentFactory<>(); + MessageSink messageSink = componentFactory.createMessageSink(SqsContainerOptions.builder() + .listenerMode(ListenerMode.BATCH).messageVisibilityHeartbeatInterval(Duration.ofSeconds(1)) + .messageVisibilityHeartbeatTimeout(Duration.ofSeconds(15)).build()); + assertThat(messageSink).isInstanceOf(MessageVisibilityHeartbeatSinkAdapter.class).extracting("delegate") + .isInstanceOf(BatchMessageSink.class); + } + @Test void shouldCreateAcknowledgementProcessorWithDefaults() { ContainerComponentFactory componentFactory = new StandardSqsComponentFactory<>(); From 8c31183bd3e11f7cf98ef5a6f9f601977944331e Mon Sep 17 00:00:00 2001 From: kzurawski Date: Tue, 12 May 2026 09:19:19 -0400 Subject: [PATCH 2/3] feat-sqs-heartbeat Fixes #1554: Update the javadoc on the new heartbeat sink class. --- .../adapter/MessageVisibilityHeartbeatSinkAdapter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java index 53008f6fd9..7d9842df53 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/adapter/MessageVisibilityHeartbeatSinkAdapter.java @@ -44,9 +44,14 @@ /** * A {@link AbstractDelegatingMessageListeningSinkAdapter} that adds an interceptor responsible for periodically * extending visibility while a message (or message batch) is being processed. + * This is known as heart beating and recommended in SQS best practices for cases when expected duration is unknown. + * See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#visibility-timeout-best-practices + * > If you're unsure about the exact processing time, begin with a shorter timeout (for example, 2 minutes) + * and extend it as necessary. Implement a heartbeat mechanism to periodically extend the visibility timeout, + * ensuring the message remains invisible until processing is complete. * - * @author Tomaz Fernandes - * @since 3.5 + * @author kzurawski + * @since 4.1.0 */ public class MessageVisibilityHeartbeatSinkAdapter extends AbstractDelegatingMessageListeningSinkAdapter { From 021cda979689abc1ffcfd1dbc2244266752a64b9 Mon Sep 17 00:00:00 2001 From: kzurawski Date: Tue, 12 May 2026 11:04:06 -0400 Subject: [PATCH 3/3] feat-sqs-heartbeat Fixes #1554: Update docs for new optional values in annotation --- docs/src/main/asciidoc/sqs.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 9b69146b08..5784c76389 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -815,6 +815,10 @@ See <> for more information. - `acknowledgementMode` - Set the acknowledgement mode for the container. If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options. See <> for more information. +- `messageVisibilityHeartbeatIntervalSeconds` - Set the interval for sending visibility heartbeats for messages that are being processed. +Note that this is optional and used in conjunction with `messageVisibilityHeartbeatSeconds`. +- `messageVisibilityHeartbeatSeconds` - Set the visibility timeout extension to be applied in the heartbeats for messages that are being processed. +Note that this is optional and used in conjunction with `messageVisibilityHeartbeatIntervalSeconds`, and make sure the interval is shorter than this value. ===== Listener Method Arguments