Skip to content

Commit 70e4540

Browse files
authored
MINOR: Ignore unassigned records in MockConsumer (#21631)
In `poll()`, `MockConsumer` iterates the added `records`, and only checks that they belong to an assigned partition quite late in the process. This can cause an error if records were added for a partition that was later removed from the assignment: ```java consumer.assign(Arrays.asList(new TopicPartition("t1", 0), new TopicPartition("t2", 0))); consumer.addRecord(new ConsumerRecord("t1", 0, 0, "a", 123)); consumer.addRecord(new ConsumerRecord("t2", 0, 0, "b", 123)); consumer.assign(Collections.singleton(new TopicPartition("t1", 0))); // throws IllegalStateException consumer.poll(Duration.seconds(1)); ``` Moving this check earlier in the process avoids this error in `poll()`, instead discarding records that do not belong to the assignment. This enables tests to proactively add all their records and then modify the assignment multiple times, if necessary, to test specific combinations of partitions. This is particularly useful in some of the `streams` tests, where we need to test with multiple state store changelogs, but the assignment gets changed internally by Kafka Streams (notably, `GlobalStateManagerImplTest`). Reviewers: Bill Bejeck <bbejeck@apache.org>
1 parent 4e18307 commit 70e4540

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
284284
while (partitionsIter.hasNext() && numPollRecords < this.maxPollRecords) {
285285
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = partitionsIter.next();
286286

287-
if (!subscriptions.isPaused(entry.getKey())) {
287+
if (!subscriptions.isPaused(entry.getKey()) && subscriptions.isAssigned(entry.getKey())) {
288288
final Iterator<ConsumerRecord<K, V>> recIterator = entry.getValue().iterator();
289289
while (recIterator.hasNext()) {
290290
if (numPollRecords >= this.maxPollRecords) {
@@ -298,7 +298,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
298298
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
299299
}
300300

301-
if (assignment().contains(entry.getKey()) && rec.offset() >= position) {
301+
if (rec.offset() >= position) {
302302
results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
303303
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
304304
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1456,7 +1456,7 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean processingThreads
14561456

14571457
assertThat(thrown.getCause(), isA(IllegalStateException.class));
14581458
// The Mock consumer shall throw as the assignment has been wiped out, but records are assigned.
1459-
assertEquals("No current assignment for partition topic1-1", thrown.getCause().getMessage());
1459+
assertEquals("Cannot add records for a partition that is not assigned to the consumer", thrown.getCause().getMessage());
14601460
assertFalse(consumer.shouldRebalance());
14611461

14621462
verify(taskManager).handleLostAll();

0 commit comments

Comments
 (0)