|
16 | 16 | */ |
17 | 17 | package org.apache.kafka.streams.integration; |
18 | 18 |
|
| 19 | +import org.apache.kafka.common.serialization.LongDeserializer; |
19 | 20 | import org.apache.kafka.common.serialization.Serdes; |
20 | 21 | import org.apache.kafka.common.serialization.StringDeserializer; |
21 | 22 | import org.apache.kafka.common.serialization.StringSerializer; |
|
35 | 36 | import org.apache.kafka.streams.kstream.Produced; |
36 | 37 | import org.apache.kafka.streams.kstream.SessionWindows; |
37 | 38 | import org.apache.kafka.streams.kstream.SlidingWindows; |
| 39 | +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; |
38 | 40 | import org.apache.kafka.streams.kstream.TimeWindows; |
39 | 41 | import org.apache.kafka.streams.kstream.ValueTransformerWithKey; |
40 | 42 | import org.apache.kafka.streams.kstream.Windowed; |
| 43 | +import org.apache.kafka.streams.kstream.WindowedSerdes; |
41 | 44 | import org.apache.kafka.streams.processor.ProcessorContext; |
42 | 45 | import org.apache.kafka.streams.processor.api.ContextualProcessor; |
43 | 46 | import org.apache.kafka.streams.processor.api.Record; |
| 47 | +import org.apache.kafka.streams.state.AggregationWithHeaders; |
44 | 48 | import org.apache.kafka.streams.state.KeyValueIterator; |
45 | 49 | import org.apache.kafka.streams.state.KeyValueStore; |
46 | 50 | import org.apache.kafka.streams.state.SessionStore; |
| 51 | +import org.apache.kafka.streams.state.SessionStoreWithHeaders; |
47 | 52 | import org.apache.kafka.streams.state.Stores; |
48 | 53 | import org.apache.kafka.streams.state.TimestampedKeyValueStore; |
49 | 54 | import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; |
@@ -556,4 +561,245 @@ public void process(final Record<String, String> record) { |
556 | 561 | assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); |
557 | 562 | } |
558 | 563 | } |
| 564 | + |
| 565 | + @Test |
| 566 | + public void processorShouldAccessKStreamAggregatedKTableStoreAsHeadersStoreViaSupplier() { |
| 567 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 568 | + |
| 569 | + final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = |
| 570 | + Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")); |
| 571 | + |
| 572 | + builder |
| 573 | + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 574 | + .groupByKey() |
| 575 | + .aggregate( |
| 576 | + () -> "", |
| 577 | + (key, value, aggregate) -> value, |
| 578 | + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) |
| 579 | + ) |
| 580 | + .toStream() |
| 581 | + .process(() -> new ContextualProcessor<String, String, String, String>() { |
| 582 | + @Override |
| 583 | + public void process(final Record<String, String> record) { |
| 584 | + final TimestampedKeyValueStoreWithHeaders<String, String> store = context().getStateStore("table-store"); |
| 585 | + |
| 586 | + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> it = store.all()) { |
| 587 | + while (it.hasNext()) { |
| 588 | + final KeyValue<String, ValueTimestampHeaders<String>> row = it.next(); |
| 589 | + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); |
| 590 | + } |
| 591 | + } |
| 592 | + } |
| 593 | + }, "table-store") |
| 594 | + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); |
| 595 | + |
| 596 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 597 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 598 | + final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); |
| 599 | + |
| 600 | + inputTopic.pipeInput("key1", "value1"); |
| 601 | + |
| 602 | + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); |
| 603 | + } |
| 604 | + } |
| 605 | + |
| 606 | + @Test |
| 607 | + public void processorShouldAccessKStreamReducedKTableStoreAsHeadersStoreViaSupplier() { |
| 608 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 609 | + |
| 610 | + final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = |
| 611 | + Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")); |
| 612 | + |
| 613 | + builder |
| 614 | + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 615 | + .groupByKey() |
| 616 | + .reduce( |
| 617 | + (value, aggregate) -> value, |
| 618 | + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) |
| 619 | + ) |
| 620 | + .toStream() |
| 621 | + .process(() -> new ContextualProcessor<String, String, String, String>() { |
| 622 | + @Override |
| 623 | + public void process(final Record<String, String> record) { |
| 624 | + final TimestampedKeyValueStoreWithHeaders<String, String> store = context().getStateStore("table-store"); |
| 625 | + |
| 626 | + try (final KeyValueIterator<String, ValueTimestampHeaders<String>> it = store.all()) { |
| 627 | + while (it.hasNext()) { |
| 628 | + final KeyValue<String, ValueTimestampHeaders<String>> row = it.next(); |
| 629 | + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); |
| 630 | + } |
| 631 | + } |
| 632 | + } |
| 633 | + }, "table-store") |
| 634 | + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); |
| 635 | + |
| 636 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 637 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 638 | + final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); |
| 639 | + |
| 640 | + inputTopic.pipeInput("key1", "value1"); |
| 641 | + |
| 642 | + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); |
| 643 | + } |
| 644 | + } |
| 645 | + |
| 646 | + @Test |
| 647 | + public void processorShouldAccessKStreamCountKTableStoreAsHeadersStoreViaSupplier() { |
| 648 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 649 | + |
| 650 | + builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 651 | + .groupByKey() |
| 652 | + .count(Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"))) |
| 653 | + .toStream() |
| 654 | + .process(() -> new ContextualProcessor<String, Long, String, Long>() { |
| 655 | + @Override |
| 656 | + public void process(final Record<String, Long> record) { |
| 657 | + final TimestampedKeyValueStoreWithHeaders<String, Long> store = context().getStateStore("table-store"); |
| 658 | + |
| 659 | + try (final KeyValueIterator<String, ValueTimestampHeaders<Long>> it = store.all()) { |
| 660 | + while (it.hasNext()) { |
| 661 | + final KeyValue<String, ValueTimestampHeaders<Long>> row = it.next(); |
| 662 | + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); |
| 663 | + } |
| 664 | + } |
| 665 | + } |
| 666 | + }, "table-store") |
| 667 | + .to("output-topic", Produced.with(Serdes.String(), Serdes.Long())); |
| 668 | + |
| 669 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 670 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 671 | + final TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer()); |
| 672 | + |
| 673 | + inputTopic.pipeInput("key1", "value1"); |
| 674 | + |
| 675 | + assertEquals(KeyValue.pair("key1", 1L), outputTopic.readKeyValue()); |
| 676 | + } |
| 677 | + } |
| 678 | + |
| 679 | + @Test |
| 680 | + public void processorShouldBuildTopologyWithWindowStoreWithHeadersViaSupplier() { |
| 681 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 682 | + |
| 683 | + final Materialized<String, String, WindowStore<Bytes, byte[]>> materialized = |
| 684 | + Materialized.as(Stores.persistentTimestampedWindowStoreWithHeaders("table-store", Duration.ofHours(24L), Duration.ofHours(1L), false)); |
| 685 | + |
| 686 | + builder |
| 687 | + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 688 | + .groupByKey() |
| 689 | + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L))) |
| 690 | + .aggregate( |
| 691 | + () -> "", |
| 692 | + (key, value, aggregate) -> value, |
| 693 | + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) |
| 694 | + ) |
| 695 | + .toStream() |
| 696 | + .process(() -> new ContextualProcessor<Windowed<String>, String, Windowed<String>, String>() { |
| 697 | + @Override |
| 698 | + public void process(final Record<Windowed<String>, String> record) { |
| 699 | + final WindowStore<String, ValueTimestampHeaders<String>> store = context().getStateStore("table-store"); |
| 700 | + |
| 701 | + try (final KeyValueIterator<Windowed<String>, ValueTimestampHeaders<String>> it = store.all()) { |
| 702 | + while (it.hasNext()) { |
| 703 | + final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> row = it.next(); |
| 704 | + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); |
| 705 | + } |
| 706 | + } |
| 707 | + } |
| 708 | + }, "table-store") |
| 709 | + .to("output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, Duration.ofHours(1L).toMillis()), Serdes.String())); |
| 710 | + |
| 711 | + // Verify topology can be built and run with window headers store supplier |
| 712 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 713 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 714 | + final TestOutputTopic<Windowed<String>, String> outputTopic = testDriver.createOutputTopic("output-topic", new TimeWindowedDeserializer<>(new StringDeserializer(), Duration.ofHours(1L).toMillis()), new StringDeserializer()); |
| 715 | + |
| 716 | + inputTopic.pipeInput("key1", "value1"); |
| 717 | + |
| 718 | + assertEquals("value1", outputTopic.readKeyValue().value); |
| 719 | + } |
| 720 | + } |
| 721 | + |
| 722 | + @Test |
| 723 | + public void processorShouldAccessKStreamSessionAggregatedKTableStoreAsHeadersStoreViaSupplier() { |
| 724 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 725 | + |
| 726 | + final Materialized<String, String, SessionStore<Bytes, byte[]>> materialized = |
| 727 | + Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", Duration.ofHours(1L))); |
| 728 | + |
| 729 | + builder |
| 730 | + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 731 | + .groupByKey() |
| 732 | + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L))) |
| 733 | + .aggregate( |
| 734 | + () -> "", |
| 735 | + (key, value, aggregate) -> value, |
| 736 | + (key, left, right) -> left, |
| 737 | + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) |
| 738 | + ) |
| 739 | + .toStream((windowedKey, value) -> windowedKey.key()) |
| 740 | + .process(() -> new ContextualProcessor<String, String, String, String>() { |
| 741 | + @Override |
| 742 | + public void process(final Record<String, String> record) { |
| 743 | + final SessionStoreWithHeaders<String, String> store = context().getStateStore("table-store"); |
| 744 | + |
| 745 | + try (final KeyValueIterator<Windowed<String>, AggregationWithHeaders<String>> it = store.findSessions("key1", 0L, Long.MAX_VALUE)) { |
| 746 | + while (it.hasNext()) { |
| 747 | + final KeyValue<Windowed<String>, AggregationWithHeaders<String>> row = it.next(); |
| 748 | + context().forward(new Record<>(row.key.key(), row.value.aggregation(), record.timestamp())); |
| 749 | + } |
| 750 | + } |
| 751 | + } |
| 752 | + }, "table-store") |
| 753 | + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); |
| 754 | + |
| 755 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 756 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 757 | + final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); |
| 758 | + |
| 759 | + inputTopic.pipeInput("key1", "value1"); |
| 760 | + |
| 761 | + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); |
| 762 | + } |
| 763 | + } |
| 764 | + |
| 765 | + @Test |
| 766 | + public void processorShouldAccessKStreamSessionReducedKTableStoreAsHeadersStoreViaSupplier() { |
| 767 | + final StreamsBuilder builder = new StreamsBuilder(); |
| 768 | + |
| 769 | + final Materialized<String, String, SessionStore<Bytes, byte[]>> materialized = |
| 770 | + Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", Duration.ofHours(1L))); |
| 771 | + |
| 772 | + builder |
| 773 | + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) |
| 774 | + .groupByKey() |
| 775 | + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L))) |
| 776 | + .reduce( |
| 777 | + (value, aggregate) -> value, |
| 778 | + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) |
| 779 | + ) |
| 780 | + .toStream((windowedKey, value) -> windowedKey.key()) |
| 781 | + .process(() -> new ContextualProcessor<String, String, String, String>() { |
| 782 | + @Override |
| 783 | + public void process(final Record<String, String> record) { |
| 784 | + final SessionStoreWithHeaders<String, String> store = context().getStateStore("table-store"); |
| 785 | + |
| 786 | + try (final KeyValueIterator<Windowed<String>, AggregationWithHeaders<String>> it = store.findSessions("key1", 0L, Long.MAX_VALUE)) { |
| 787 | + while (it.hasNext()) { |
| 788 | + final KeyValue<Windowed<String>, AggregationWithHeaders<String>> row = it.next(); |
| 789 | + context().forward(new Record<>(row.key.key(), row.value.aggregation(), record.timestamp())); |
| 790 | + } |
| 791 | + } |
| 792 | + } |
| 793 | + }, "table-store") |
| 794 | + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); |
| 795 | + |
| 796 | + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { |
| 797 | + final TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); |
| 798 | + final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); |
| 799 | + |
| 800 | + inputTopic.pipeInput("key1", "value1"); |
| 801 | + |
| 802 | + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); |
| 803 | + } |
| 804 | + } |
559 | 805 | } |
0 commit comments