diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java index 172ffd707808..f9bbf7c778c5 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java @@ -812,6 +812,11 @@ public static class Streams { */ private @Nullable String stateDir; + /** + * Whether the consumer should leave the group when stopping Kafka Streams. + */ + private boolean leaveGroupOnClose; + /** * Additional Kafka properties used to configure the streams. */ @@ -885,6 +890,14 @@ public void setStateDir(@Nullable String stateDir) { this.stateDir = stateDir; } + public boolean isLeaveGroupOnClose() { + return this.leaveGroupOnClose; + } + + public void setLeaveGroupOnClose(boolean leaveGroupOnClose) { + this.leaveGroupOnClose = leaveGroupOnClose; + } + public Map getProperties() { return this.properties; } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java index 1c7184960063..410d72067254 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java @@ -101,6 +101,7 @@ public void configure(StreamsBuilderFactoryBean factoryBean) { KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); factoryBean.setCleanupConfig(cleanupConfig); + factoryBean.setLeaveGroupOnClose(this.properties.getStreams().isLeaveGroupOnClose()); } @Override