Skip to content

Commit f645314

Browse files
authored
MINOR: Use mutable maps in Fetch.forPartition to avoid potential UnsupportedOperationException (#21816)
Fetch.forPartition used Map.of() which creates immutable maps. If add() is called on a forPartition-created Fetch as the target, putAll on the immutable nextOffsetAndMetadata would throw UnsupportedOperationException. Replace with HashMap to ensure mutability. See #21726 (comment) Added FetchTest to cover forPartition mutability, add() merging, positionAdvanced propagation, and unmodifiable return maps. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent a83dda8 commit f645314

File tree

2 files changed

+161
-11
lines changed
  • clients/src
    • main/java/org/apache/kafka/clients/consumer/internals
    • test/java/org/apache/kafka/clients/consumer/internals

2 files changed

+161
-11
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class Fetch<K, V> {
3434
private int numRecords;
3535

3636
public static <K, V> Fetch<K, V> empty() {
37-
return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
37+
return new Fetch<>(false, 0);
3838
}
3939

4040
public static <K, V> Fetch<K, V> forPartition(
@@ -43,25 +43,34 @@ public static <K, V> Fetch<K, V> forPartition(
4343
boolean positionAdvanced,
4444
OffsetAndMetadata nextOffsetAndMetadata
4545
) {
46-
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = records.isEmpty()
47-
? Map.of()
48-
: Map.of(partition, records);
49-
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap = Map.of(partition, nextOffsetAndMetadata);
50-
return new Fetch<>(recordsMap, positionAdvanced, records.size(), nextOffsetAndMetadataMap);
46+
return new Fetch<>(positionAdvanced, partition, records, nextOffsetAndMetadata);
5147
}
5248

5349
private Fetch(
54-
Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
5550
boolean positionAdvanced,
56-
int numRecords,
57-
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata
51+
int numRecords
5852
) {
59-
this.records = records;
53+
this.records = new HashMap<>();
6054
this.positionAdvanced = positionAdvanced;
6155
this.numRecords = numRecords;
62-
this.nextOffsetAndMetadata = nextOffsetAndMetadata;
56+
this.nextOffsetAndMetadata = new HashMap<>();
6357
}
6458

59+
private Fetch(
60+
boolean positionAdvanced,
61+
TopicPartition partition,
62+
List<ConsumerRecord<K, V>> records,
63+
OffsetAndMetadata offsetAndMetadata
64+
) {
65+
this.records = new HashMap<>();
66+
if (!records.isEmpty()) {
67+
this.records.put(partition, records);
68+
}
69+
this.positionAdvanced = positionAdvanced;
70+
this.numRecords = records.size();
71+
this.nextOffsetAndMetadata = new HashMap<>();
72+
this.nextOffsetAndMetadata.put(partition, offsetAndMetadata);
73+
}
6574
/**
6675
* Add another {@link Fetch} to this one; all of its records will be added to this fetch's
6776
* {@link #records() records}, and if the other fetch
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.util.List;
26+
import java.util.Optional;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
class FetchTest {
34+
35+
private static final TopicPartition TP0 = new TopicPartition("topic", 0);
36+
private static final TopicPartition TP1 = new TopicPartition("topic", 1);
37+
38+
@Test
39+
void testAddToForPartitionFetch() {
40+
var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0", "value0"));
41+
var target = Fetch.forPartition(TP0, records0, true, new OffsetAndMetadata(1, Optional.empty(), ""));
42+
43+
var records1 = List.of(new ConsumerRecord<>("topic", 1, 0, "key1", "value1"));
44+
var source = Fetch.forPartition(TP1, records1, true, new OffsetAndMetadata(1, Optional.empty(), ""));
45+
46+
target.add(source);
47+
48+
assertEquals(2, target.numRecords());
49+
assertTrue(target.records().containsKey(TP0));
50+
assertTrue(target.records().containsKey(TP1));
51+
assertTrue(target.nextOffsets().containsKey(TP0));
52+
assertTrue(target.nextOffsets().containsKey(TP1));
53+
}
54+
55+
@Test
56+
void testAddForPartitionFetchToEmptyFetch() {
57+
Fetch<String, String> target = Fetch.empty();
58+
59+
var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", "value"));
60+
var source = Fetch.forPartition(TP0, records, true, new OffsetAndMetadata(1, Optional.empty(), ""));
61+
62+
target.add(source);
63+
64+
assertEquals(1, target.numRecords());
65+
assertTrue(target.records().containsKey(TP0));
66+
assertTrue(target.nextOffsets().containsKey(TP0));
67+
}
68+
69+
@Test
70+
void testForPartitionWithEmptyRecords() {
71+
var fetch = Fetch.forPartition(TP0, List.of(), true, new OffsetAndMetadata(1, Optional.empty(), ""));
72+
73+
assertTrue(fetch.records().isEmpty());
74+
assertEquals(0, fetch.numRecords());
75+
assertTrue(fetch.positionAdvanced());
76+
assertFalse(fetch.isEmpty());
77+
assertTrue(fetch.nextOffsets().containsKey(TP0));
78+
}
79+
80+
@Test
81+
void testAddWithSamePartitionMergesRecords() {
82+
var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0", "value0"));
83+
var records1 = List.of(new ConsumerRecord<>("topic", 0, 1, "key1", "value1"));
84+
var target = Fetch.forPartition(TP0, records0, true, new OffsetAndMetadata(1, Optional.empty(), ""));
85+
var source = Fetch.forPartition(TP0, records1, true, new OffsetAndMetadata(2, Optional.empty(), ""));
86+
87+
target.add(source);
88+
89+
assertEquals(2, target.numRecords());
90+
assertEquals(2, target.records().get(TP0).size());
91+
}
92+
93+
@Test
94+
void testAddPropagatesPositionAdvanced() {
95+
var target = Fetch.forPartition(TP0, List.of(), false, new OffsetAndMetadata(0, Optional.empty(), ""));
96+
var source = Fetch.forPartition(TP1, List.of(), true, new OffsetAndMetadata(1, Optional.empty(), ""));
97+
98+
assertFalse(target.positionAdvanced());
99+
100+
target.add(source);
101+
102+
assertTrue(target.positionAdvanced());
103+
}
104+
105+
@Test
106+
void testForPartitionWithoutPositionAdvanced() {
107+
var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", "value"));
108+
var fetch = Fetch.forPartition(TP0, records, false, new OffsetAndMetadata(1, Optional.empty(), ""));
109+
110+
assertFalse(fetch.positionAdvanced());
111+
assertFalse(fetch.isEmpty());
112+
assertEquals(1, fetch.numRecords());
113+
}
114+
115+
@Test
116+
void testRecordsReturnsUnmodifiableMap() {
117+
var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", "value"));
118+
var fetch = Fetch.forPartition(TP0, records, true, new OffsetAndMetadata(1, Optional.empty(), ""));
119+
120+
assertThrows(UnsupportedOperationException.class, () -> fetch.records().put(TP1, List.of()));
121+
}
122+
123+
@Test
124+
void testNextOffsetsReturnsUnmodifiableMap() {
125+
var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", "value"));
126+
var fetch = Fetch.forPartition(TP0, records, true, new OffsetAndMetadata(1, Optional.empty(), ""));
127+
128+
assertThrows(UnsupportedOperationException.class,
129+
() -> fetch.nextOffsets().put(TP1, new OffsetAndMetadata(99, Optional.empty(), "")));
130+
}
131+
132+
@Test
133+
void testEmpty() {
134+
Fetch<String, String> fetch = Fetch.empty();
135+
assertTrue(fetch.isEmpty());
136+
assertEquals(0, fetch.numRecords());
137+
assertFalse(fetch.positionAdvanced());
138+
assertTrue(fetch.records().isEmpty());
139+
assertTrue(fetch.nextOffsets().isEmpty());
140+
}
141+
}

0 commit comments

Comments
 (0)