Skip to content

Commit 8cad040

Browse files
authored
KAFKA-20261: Upgrade path for plain window store to headers store (#21710)
Extends headers-aware window store migration to support upgrading from plain (non-timestamped) stores. - Plain-to-Headers Migration Support - Implemented query() method in the adapter class Testing: - 3 upgrade intergration tests - 2 downgrade integration tests Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
1 parent 80e6817 commit 8cad040

File tree

6 files changed

+940
-5
lines changed

6 files changed

+940
-5
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
4646
import org.apache.kafka.streams.state.ValueAndTimestamp;
4747
import org.apache.kafka.streams.state.ValueTimestampHeaders;
48+
import org.apache.kafka.streams.state.WindowStore;
4849
import org.apache.kafka.test.TestUtils;
4950

5051
import org.junit.jupiter.api.AfterAll;
@@ -628,6 +629,123 @@ public void process(final Record<String, String> record) {
628629
}
629630
}
630631

632+
@Test
633+
public void shouldMigrateInMemoryPlainWindowStoreToTimestampedWindowStoreWithHeaders() throws Exception {
634+
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(false);
635+
}
636+
637+
@Test
638+
public void shouldMigratePersistentPlainWindowStoreToTimestampedWindowStoreWithHeaders() throws Exception {
639+
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(true);
640+
}
641+
642+
private void shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(final boolean persistentStore) throws Exception {
643+
// Run with old plain WindowStore
644+
final StreamsBuilder oldBuilder = new StreamsBuilder();
645+
oldBuilder.addStateStore(
646+
Stores.windowStoreBuilder(
647+
persistentStore
648+
? Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
649+
: Stores.inMemoryWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
650+
Serdes.String(),
651+
Serdes.String()))
652+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
653+
.process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
654+
655+
final Properties props = props();
656+
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
657+
kafkaStreams.start();
658+
659+
final long baseTime = CLUSTER.time.milliseconds();
660+
processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 100);
661+
processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime + 200);
662+
processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime + 300);
663+
664+
kafkaStreams.close();
665+
kafkaStreams = null;
666+
667+
// Restart with TimestampedWindowStoreWithHeaders
668+
final StreamsBuilder newBuilder = new StreamsBuilder();
669+
newBuilder.addStateStore(
670+
Stores.timestampedWindowStoreWithHeadersBuilder(
671+
persistentStore
672+
? Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
673+
: Stores.inMemoryWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
674+
Serdes.String(),
675+
Serdes.String()))
676+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
677+
.process(TimestampedWindowedWithHeadersProcessor::new, WINDOW_STORE_NAME);
678+
679+
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
680+
kafkaStreams.start();
681+
682+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", baseTime + 100, persistentStore ? -1L : baseTime + 100);
683+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", baseTime + 200, persistentStore ? -1L : baseTime + 200);
684+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3", baseTime + 300, persistentStore ? -1L : baseTime + 300);
685+
686+
final Headers headers = new RecordHeaders();
687+
headers.add("source", "migration-test".getBytes());
688+
headers.add("version", "1.0".getBytes());
689+
690+
processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", baseTime + 350, headers, headers);
691+
processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime + 400, headers, headers);
692+
693+
kafkaStreams.close();
694+
}
695+
696+
@Test
697+
public void shouldProxyPlainWindowStoreToTimestampedWindowStoreWithHeaders() throws Exception {
698+
final StreamsBuilder oldBuilder = new StreamsBuilder();
699+
oldBuilder.addStateStore(
700+
Stores.windowStoreBuilder(
701+
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
702+
Serdes.String(),
703+
Serdes.String()))
704+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
705+
.process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
706+
707+
final Properties props = props();
708+
kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
709+
kafkaStreams.start();
710+
711+
final long baseTime = CLUSTER.time.milliseconds();
712+
processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 100);
713+
processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime + 200);
714+
processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime + 300);
715+
716+
kafkaStreams.close();
717+
kafkaStreams = null;
718+
719+
// Restart with headers-aware builder but non-headers supplier (proxy/adapter mode)
720+
final StreamsBuilder newBuilder = new StreamsBuilder();
721+
newBuilder.addStateStore(
722+
Stores.timestampedWindowStoreWithHeadersBuilder(
723+
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), // non-headers supplier!
724+
Serdes.String(),
725+
Serdes.String()))
726+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
727+
.process(TimestampedWindowedWithHeadersProcessor::new, WINDOW_STORE_NAME);
728+
729+
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
730+
kafkaStreams.start();
731+
732+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", baseTime + 100, -1L);
733+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", baseTime + 200, -1L);
734+
verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3", baseTime + 300, -1L);
735+
736+
final RecordHeaders headers = new RecordHeaders();
737+
headers.add("source", "proxy-test".getBytes());
738+
headers.add("version", "2.0".getBytes());
739+
740+
// In proxy mode with plain store, headers and timestamps are not preserved
741+
final RecordHeaders expectedHeaders = new RecordHeaders();
742+
743+
processPlainWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", baseTime + 350, headers, expectedHeaders);
744+
processPlainWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime + 400, headers, expectedHeaders);
745+
746+
kafkaStreams.close();
747+
}
748+
631749
@Test
632750
public void shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws Exception {
633751
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
@@ -748,6 +866,136 @@ public void shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders
748866
kafkaStreams.close();
749867
}
750868

869+
private void processPlainWindowedKeyValueAndVerify(final String key,
870+
final String value,
871+
final long timestamp) throws Exception {
872+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
873+
inputStream,
874+
List.of(KeyValue.pair(key, value)),
875+
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
876+
StringSerializer.class,
877+
StringSerializer.class),
878+
timestamp,
879+
false);
880+
881+
TestUtils.waitForCondition(() -> {
882+
try {
883+
final ReadOnlyWindowStore<String, String> store =
884+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
885+
886+
if (store == null) {
887+
return false;
888+
}
889+
890+
final long windowStart = timestamp - (timestamp % WINDOW_SIZE_MS);
891+
final String result = store.fetch(key, windowStart);
892+
893+
return result != null && result.equals(value);
894+
} catch (final Exception e) {
895+
return false;
896+
}
897+
}, 60_000L, "Could not verify plain window value in time.");
898+
}
899+
900+
private void verifyPlainWindowValueWithEmptyHeadersAndTimestamp(final String key,
901+
final String value,
902+
final long windowTimestamp,
903+
final long expectedTimestamp) throws Exception {
904+
TestUtils.waitForCondition(() -> {
905+
try {
906+
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
907+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
908+
909+
if (store == null) {
910+
return false;
911+
}
912+
913+
final long windowStart = windowTimestamp - (windowTimestamp % WINDOW_SIZE_MS);
914+
915+
final List<KeyValue<Windowed<String>, ValueTimestampHeaders<String>>> results = new LinkedList<>();
916+
try (final KeyValueIterator<Windowed<String>, ValueTimestampHeaders<String>> iterator = store.all()) {
917+
while (iterator.hasNext()) {
918+
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> kv = iterator.next();
919+
if (kv.key.key().equals(key) && kv.key.window().start() == windowStart) {
920+
results.add(kv);
921+
}
922+
}
923+
}
924+
925+
if (results.isEmpty()) {
926+
return false;
927+
}
928+
929+
final ValueTimestampHeaders<String> result = results.get(0).value;
930+
assertNotNull(result, "Result should not be null");
931+
assertEquals(value, result.value(), "Value should match");
932+
assertEquals(expectedTimestamp, result.timestamp(), "Timestamp should be " + expectedTimestamp + " for plain store migration");
933+
934+
// Verify headers exist but are empty (migrated from plain store without headers or timestamps)
935+
assertNotNull(result.headers(), "Headers should not be null for migrated data");
936+
assertEquals(0, result.headers().toArray().length, "Headers should be empty for migrated data");
937+
938+
return true;
939+
} catch (final Exception e) {
940+
e.printStackTrace();
941+
return false;
942+
}
943+
}, 60_000L, "Could not verify plain window value with empty headers and timestamp in time.");
944+
}
945+
946+
private void processPlainWindowedKeyValueWithHeadersAndVerify(final String key,
947+
final String value,
948+
final long timestamp,
949+
final Headers headers,
950+
final Headers expectedHeaders) throws Exception {
951+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
952+
inputStream,
953+
List.of(KeyValue.pair(key, value)),
954+
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
955+
StringSerializer.class,
956+
StringSerializer.class),
957+
headers,
958+
timestamp,
959+
false);
960+
961+
TestUtils.waitForCondition(() -> {
962+
try {
963+
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
964+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
965+
966+
if (store == null) {
967+
return false;
968+
}
969+
970+
final long windowStart = timestamp - (timestamp % WINDOW_SIZE_MS);
971+
972+
final List<KeyValue<Windowed<String>, ValueTimestampHeaders<String>>> results = new LinkedList<>();
973+
try (final KeyValueIterator<Windowed<String>, ValueTimestampHeaders<String>> iterator = store.all()) {
974+
while (iterator.hasNext()) {
975+
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> kv = iterator.next();
976+
if (kv.key.key().equals(key) && kv.key.window().start() == windowStart) {
977+
results.add(kv);
978+
}
979+
}
980+
}
981+
982+
if (results.isEmpty()) {
983+
return false;
984+
}
985+
986+
final ValueTimestampHeaders<String> result = results.get(0).value;
987+
// For plain window stores, timestamp is always -1 since it's not preserved
988+
return result != null
989+
&& result.value().equals(value)
990+
&& result.timestamp() == -1L
991+
&& result.headers().equals(expectedHeaders);
992+
} catch (final Exception e) {
993+
e.printStackTrace();
994+
return false;
995+
}
996+
}, 60_000L, "Could not verify plain windowed value with headers in time.");
997+
}
998+
751999
private void processWindowedKeyValueAndVerifyTimestamped(final String key,
7521000
final String value,
7531001
final long timestamp) throws Exception {
@@ -878,6 +1126,24 @@ private void verifyWindowValueWithEmptyHeaders(final String key,
8781126
}, 60_000L, "Could not verify legacy value with empty headers in time.");
8791127
}
8801128

1129+
/**
1130+
* Processor for plain WindowStore (without timestamps or headers).
1131+
*/
1132+
private static class PlainWindowedProcessor implements Processor<String, String, Void, Void> {
1133+
private WindowStore<String, String> store;
1134+
1135+
@Override
1136+
public void init(final ProcessorContext<Void, Void> context) {
1137+
store = context.getStateStore(WINDOW_STORE_NAME);
1138+
}
1139+
1140+
@Override
1141+
public void process(final Record<String, String> record) {
1142+
final long windowStart = record.timestamp() - (record.timestamp() % WINDOW_SIZE_MS);
1143+
store.put(record.key(), record.value(), windowStart);
1144+
}
1145+
}
1146+
8811147
/**
8821148
* Processor for TimestampedWindowStore (without headers).
8831149
*/
@@ -1065,6 +1331,54 @@ public void shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersTo
10651331
}
10661332

10671333

1334+
@Test
1335+
public void shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStore() throws Exception {
1336+
final Properties props = props();
1337+
setupAndPopulateWindowStoreWithHeaders(props, List.of(KeyValue.pair("key1", 100L)));
1338+
kafkaStreams = null;
1339+
1340+
final StreamsBuilder downgradedBuilder = new StreamsBuilder();
1341+
downgradedBuilder.addStateStore(
1342+
Stores.windowStoreBuilder(
1343+
Stores.persistentWindowStore(WINDOW_STORE_NAME,
1344+
Duration.ofMillis(RETENTION_MS),
1345+
Duration.ofMillis(WINDOW_SIZE_MS),
1346+
false),
1347+
Serdes.String(),
1348+
Serdes.String()))
1349+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
1350+
.process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
1351+
1352+
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
1353+
1354+
boolean exceptionThrown = false;
1355+
try {
1356+
kafkaStreams.start();
1357+
} catch (final Exception e) {
1358+
Throwable cause = e;
1359+
while (cause != null) {
1360+
if (cause instanceof ProcessorStateException &&
1361+
cause.getMessage() != null &&
1362+
cause.getMessage().contains("headers-aware") &&
1363+
cause.getMessage().contains("Downgrade")) {
1364+
exceptionThrown = true;
1365+
break;
1366+
}
1367+
cause = cause.getCause();
1368+
}
1369+
1370+
if (!exceptionThrown) {
1371+
throw new AssertionError("Expected ProcessorStateException about downgrade not being supported, but got: " + e.getMessage(), e);
1372+
}
1373+
} finally {
1374+
kafkaStreams.close(Duration.ofSeconds(30L));
1375+
}
1376+
1377+
if (!exceptionThrown) {
1378+
throw new AssertionError("Expected ProcessorStateException to be thrown when attempting to downgrade from headers-aware to plain window store");
1379+
}
1380+
}
1381+
10681382
@Test
10691383
public void shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore() throws Exception {
10701384
final Properties props = props();
@@ -1114,6 +1428,36 @@ public void shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampe
11141428
}
11151429
}
11161430

1431+
@Test
1432+
public void shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStoreAfterCleanup() throws Exception {
1433+
final Properties props = props();
1434+
setupAndPopulateWindowStoreWithHeaders(props, asList(KeyValue.pair("key1", 100L), KeyValue.pair("key2", 200L)));
1435+
1436+
kafkaStreams.cleanUp();
1437+
kafkaStreams = null;
1438+
1439+
final StreamsBuilder downgradedBuilder = new StreamsBuilder();
1440+
downgradedBuilder.addStateStore(
1441+
Stores.windowStoreBuilder(
1442+
Stores.persistentWindowStore(WINDOW_STORE_NAME,
1443+
Duration.ofMillis(RETENTION_MS),
1444+
Duration.ofMillis(WINDOW_SIZE_MS),
1445+
false),
1446+
Serdes.String(),
1447+
Serdes.String()))
1448+
.stream(inputStream, Consumed.with(Serdes.String(), Serdes.String()))
1449+
.process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
1450+
1451+
kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
1452+
kafkaStreams.start();
1453+
1454+
final long newTime = CLUSTER.time.milliseconds();
1455+
processPlainWindowedKeyValueAndVerify("key3", "value3", newTime + 300);
1456+
processPlainWindowedKeyValueAndVerify("key4", "value4", newTime + 400);
1457+
1458+
kafkaStreams.close();
1459+
}
1460+
11171461
@Test
11181462
public void shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup() throws Exception {
11191463
final Properties props = props();

0 commit comments

Comments
 (0)