Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,10 @@ See <<FIFO Support>> for more information.
- `acknowledgementMode` - Set the acknowledgement mode for the container.
If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options.
See <<Acknowledgement Mode>> for more information.
- `messageVisibilityHeartbeatIntervalSeconds` - Set the interval for sending visibility heartbeats for messages that are being processed.
Note that this is optional and used in conjunction with `messageVisibilityHeartbeatSeconds`.
- `messageVisibilityHeartbeatSeconds` - Set the visibility timeout extension to be applied in the heartbeats for messages that are being processed.
Note that this is optional and used in conjunction with `messageVisibilityHeartbeatIntervalSeconds`, and make sure the interval is shorter than this value.

===== Listener Method Arguments

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@
*/
String messageVisibilitySeconds() default "";

/**
* Interval between visibility heartbeat requests sent while listener execution is in progress.
*/
String messageVisibilityHeartbeatIntervalSeconds() default "";

/**
* Visibility timeout to apply on each visibility heartbeat request.
*/
String messageVisibilityHeartbeatSeconds() default "";

/**
* The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined
* for the container factory will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
.messageVisibility(
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
.messageVisibilityHeartbeatInterval(
resolveAsInteger(sqsListenerAnnotation.messageVisibilityHeartbeatIntervalSeconds(),
"messageVisibilityHeartbeatInterval"))
.messageVisibilityHeartbeat(resolveAsInteger(sqsListenerAnnotation.messageVisibilityHeartbeatSeconds(),
"messageVisibilityHeartbeat"))
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class SqsEndpoint extends AbstractEndpoint {

private final Integer messageVisibility;

private final Integer messageVisibilityHeartbeatInterval;

private final Integer messageVisibilityHeartbeat;

private final Integer maxMessagesPerPoll;

@Nullable
Expand All @@ -48,6 +52,8 @@ protected SqsEndpoint(SqsEndpointBuilder builder) {
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
this.messageVisibility = builder.messageVisibility;
this.messageVisibilityHeartbeatInterval = builder.messageVisibilityHeartbeatInterval;
this.messageVisibilityHeartbeat = builder.messageVisibilityHeartbeat;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.acknowledgementMode = builder.acknowledgementMode;
}
Expand Down Expand Up @@ -97,6 +103,26 @@ public Duration getMessageVisibility() {
return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
}

/**
* Return the visibility heartbeat interval for this endpoint.
* @return the visibility heartbeat interval.
*/
@Nullable
public Duration getMessageVisibilityHeartbeatInterval() {
return this.messageVisibilityHeartbeatInterval != null
? Duration.ofSeconds(this.messageVisibilityHeartbeatInterval)
: null;
}

/**
* Return the visibility timeout to apply on each heartbeat request for this endpoint.
* @return the visibility heartbeat timeout.
*/
@Nullable
public Duration getMessageVisibilityHeartbeat() {
return this.messageVisibilityHeartbeat != null ? Duration.ofSeconds(this.messageVisibilityHeartbeat) : null;
}

/**
* Returns the acknowledgement mode configured for this endpoint.
* @return the acknowledgement mode.
Expand All @@ -118,6 +144,10 @@ public static class SqsEndpointBuilder {

private Integer messageVisibility;

private Integer messageVisibilityHeartbeatInterval;

private Integer messageVisibilityHeartbeat;

private String id;

private Integer maxMessagesPerPoll;
Expand Down Expand Up @@ -155,6 +185,16 @@ public SqsEndpointBuilder messageVisibility(Integer messageVisibility) {
return this;
}

public SqsEndpointBuilder messageVisibilityHeartbeatInterval(Integer messageVisibilityHeartbeatInterval) {
this.messageVisibilityHeartbeatInterval = messageVisibilityHeartbeatInterval;
return this;
}

public SqsEndpointBuilder messageVisibilityHeartbeat(Integer messageVisibilityHeartbeat) {
this.messageVisibilityHeartbeat = messageVisibilityHeartbeat;
return this;
}

public SqsEndpointBuilder id(String id) {
this.id = id;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio
.acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll)
.acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout)
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility)
.acceptIfNotNull(sqsEndpoint.getMessageVisibilityHeartbeatInterval(),
options::messageVisibilityHeartbeatInterval)
.acceptIfNotNull(sqsEndpoint.getMessageVisibilityHeartbeat(),
options::messageVisibilityHeartbeatTimeout)
.acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter;
import io.awspring.cloud.sqs.listener.source.FifoSqsMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import java.time.Duration;
Expand Down Expand Up @@ -72,6 +73,8 @@ public MessageSink<T> createMessageSink(SqsContainerOptions options) {
MessageSink<T> deliverySink = createDeliverySink(options.getListenerMode());
MessageSink<T> wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink,
options.getMessageVisibility());
wrappedDeliverySink = maybeWrapWithVisibilityHeartbeatAdapter(wrappedDeliverySink,
options.getMessageVisibilityHeartbeatInterval(), options.getMessageVisibilityHeartbeatTimeout());
return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink);
}

Expand Down Expand Up @@ -104,6 +107,18 @@ private MessageVisibilityExtendingSinkAdapter<T> addMessageVisibilityExtendingSi
return visibilityAdapter;
}

private MessageSink<T> maybeWrapWithVisibilityHeartbeatAdapter(MessageSink<T> deliverySink,
@Nullable Duration heartbeatInterval, @Nullable Duration heartbeatTimeout) {
if (heartbeatInterval == null || heartbeatTimeout == null) {
return deliverySink;
}
MessageVisibilityHeartbeatSinkAdapter<T> heartbeatAdapter = new MessageVisibilityHeartbeatSinkAdapter<>(
deliverySink);
heartbeatAdapter.setHeartbeatInterval(heartbeatInterval);
heartbeatAdapter.setHeartbeatVisibilityTimeout(heartbeatTimeout);
return heartbeatAdapter;
}

private Function<Message<T>, String> getMessageGroupingFunction() {
return message -> MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public class SqsContainerOptions extends AbstractContainerOptions<SqsContainerOp
@Nullable
private final Duration messageVisibility;

@Nullable
private final Duration messageVisibilityHeartbeatInterval;

@Nullable
private final Duration messageVisibilityHeartbeatTimeout;

private final FifoBatchGroupingStrategy fifoBatchGroupingStrategy;

private final Collection<QueueAttributeName> queueAttributeNames;
Expand All @@ -62,6 +68,8 @@ private SqsContainerOptions(BuilderImpl builder) {
this.messageAttributeNames = builder.messageAttributeNames;
this.messageSystemAttributeNames = builder.messageSystemAttributeNames;
this.messageVisibility = builder.messageVisibility;
this.messageVisibilityHeartbeatInterval = builder.messageVisibilityHeartbeatInterval;
this.messageVisibilityHeartbeatTimeout = builder.messageVisibilityHeartbeatTimeout;
this.queueNotFoundStrategy = builder.queueNotFoundStrategy;
this.fifoBatchGroupingStrategy = builder.fifoBatchGroupingStrategy;
this.convertMessageIdToUuid = builder.convertMessageIdToUuid;
Expand Down Expand Up @@ -108,6 +116,24 @@ public Duration getMessageVisibility() {
return this.messageVisibility;
}

/**
* Get the interval between visibility heartbeat requests sent while a message is being processed.
* @return the heartbeat interval.
*/
@Nullable
public Duration getMessageVisibilityHeartbeatInterval() {
return this.messageVisibilityHeartbeatInterval;
}

/**
* Get the visibility timeout to apply on each visibility heartbeat request.
* @return the heartbeat timeout.
*/
@Nullable
public Duration getMessageVisibilityHeartbeatTimeout() {
return this.messageVisibilityHeartbeatTimeout;
}

/**
* Get messages grouping strategy in FIFO queues when retrieved by the container in listener mode
* {@link ListenerMode#BATCH}.
Expand Down Expand Up @@ -165,6 +191,12 @@ protected static class BuilderImpl
@Nullable
private Duration messageVisibility;

@Nullable
private Duration messageVisibilityHeartbeatInterval;

@Nullable
private Duration messageVisibilityHeartbeatTimeout;

private boolean convertMessageIdToUuid = true;

protected BuilderImpl() {
Expand All @@ -177,6 +209,8 @@ protected BuilderImpl(SqsContainerOptions options) {
this.messageAttributeNames = options.messageAttributeNames;
this.messageSystemAttributeNames = options.messageSystemAttributeNames;
this.messageVisibility = options.messageVisibility;
this.messageVisibilityHeartbeatInterval = options.messageVisibilityHeartbeatInterval;
this.messageVisibilityHeartbeatTimeout = options.messageVisibilityHeartbeatTimeout;
this.fifoBatchGroupingStrategy = options.fifoBatchGroupingStrategy;
this.queueNotFoundStrategy = options.queueNotFoundStrategy;
this.convertMessageIdToUuid = options.convertMessageIdToUuid;
Expand Down Expand Up @@ -212,6 +246,28 @@ public SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility)
return this;
}

@Override
public SqsContainerOptionsBuilder messageVisibilityHeartbeatInterval(
Duration messageVisibilityHeartbeatInterval) {
Assert.notNull(messageVisibilityHeartbeatInterval, "messageVisibilityHeartbeatInterval cannot be null");
Assert.isTrue(
!messageVisibilityHeartbeatInterval.isNegative() && !messageVisibilityHeartbeatInterval.isZero(),
"messageVisibilityHeartbeatInterval must be greater than zero");
this.messageVisibilityHeartbeatInterval = messageVisibilityHeartbeatInterval;
return this;
}

@Override
public SqsContainerOptionsBuilder messageVisibilityHeartbeatTimeout(
Duration messageVisibilityHeartbeatTimeout) {
Assert.notNull(messageVisibilityHeartbeatTimeout, "messageVisibilityHeartbeatTimeout cannot be null");
Assert.isTrue(
!messageVisibilityHeartbeatTimeout.isNegative() && !messageVisibilityHeartbeatTimeout.isZero(),
"messageVisibilityHeartbeatTimeout must be greater than zero");
this.messageVisibilityHeartbeatTimeout = messageVisibilityHeartbeatTimeout;
return this;
}

@Override
public SqsContainerOptionsBuilder fifoBatchGroupingStrategy(
FifoBatchGroupingStrategy fifoBatchGroupingStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ SqsContainerOptionsBuilder messageSystemAttributeNames(
*/
SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility);

/**
* Set the interval between visibility heartbeat requests sent while a message is being processed.
* @param messageVisibilityHeartbeatInterval the heartbeat interval.
* @return this instance.
*/
SqsContainerOptionsBuilder messageVisibilityHeartbeatInterval(Duration messageVisibilityHeartbeatInterval);

/**
* Set the visibility timeout to apply on each visibility heartbeat request.
* @param messageVisibilityHeartbeatTimeout the timeout to set on each heartbeat.
* @return this instance.
*/
SqsContainerOptionsBuilder messageVisibilityHeartbeatTimeout(Duration messageVisibilityHeartbeatTimeout);

/**
* Set how the messages from FIFO queues should be grouped when container listener mode is
* {@link ListenerMode#BATCH}. By default, messages are grouped in batches by message group, which are processed in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityHeartbeatSinkAdapter;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.StandardSqsMessageSource;
import java.time.Duration;
import java.util.Collection;
import org.jspecify.annotations.Nullable;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -57,12 +59,26 @@ public MessageSource<T> createMessageSource(SqsContainerOptions options) {
// @formatter:off
@Override
public MessageSink<T> createMessageSink(SqsContainerOptions options) {
return ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode())
MessageSink<T> messageSink = ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode())
? new FanOutMessageSink<>()
: new BatchMessageSink<>();
return maybeWrapWithVisibilityHeartbeatAdapter(messageSink,
options.getMessageVisibilityHeartbeatInterval(), options.getMessageVisibilityHeartbeatTimeout());
}
// @formatter:on

private MessageSink<T> maybeWrapWithVisibilityHeartbeatAdapter(MessageSink<T> messageSink,
@Nullable Duration heartbeatInterval, @Nullable Duration heartbeatTimeout) {
if (heartbeatInterval == null || heartbeatTimeout == null) {
return messageSink;
}
MessageVisibilityHeartbeatSinkAdapter<T> heartbeatSinkAdapter = new MessageVisibilityHeartbeatSinkAdapter<>(
messageSink);
heartbeatSinkAdapter.setHeartbeatInterval(heartbeatInterval);
heartbeatSinkAdapter.setHeartbeatVisibilityTimeout(heartbeatTimeout);
return heartbeatSinkAdapter;
}

@Override
public AcknowledgementProcessor<T> createAcknowledgementProcessor(SqsContainerOptions options) {
validateAcknowledgementOrdering(options);
Expand Down
Loading