从头开始消费消息seekToBeginning()
- 配置信息
1 | Properties props = new Properties(); |
新建一个KafkaConsumer
1
KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
通过
TopicPartition
指定要消费的partition1
TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
对consumer指定partition
assign
1
consumer.assign(Arrays.asList(seekToEndPartition));
调用
1
2```
consumer.seekToBeginning(Arrays.asList(seekToEndPartition));开始消费
1
2
3
4ConsumerRecords<String, byte[]> records = consumer.poll(5000);
for(ConsumerRecord<String,byte[]> record:records){
System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
}demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class MySeekToBeginningConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","132.232.14.247:9092");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id","group1");
KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
try {
TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
consumer.assign(Arrays.asList(seekToEndPartition));
consumer.seekToBeginning(Arrays.asList(seekToEndPartition));
ConsumerRecords<String, byte[]> records = consumer.poll(5000);
for(ConsumerRecord<String,byte[]> record:records){
System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
}
}catch (java.lang.Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}
}
从尾开始消费消息seekToEnd()
调用
consumer.seekToEnd
从末尾开始进行消费1
2
3···
consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
···demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class MySeekToEndConsumer {
public static void main(String[] args) {
Properties props=new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,byte[]> consumer=new KafkaConsumer<String,byte[]>(props);
try {
TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic", 1);
consumer.assign(Arrays.asList(seekToEndPartition));
consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
ConsumerRecords<String, byte[]> records=consumer.poll(1000);
for(ConsumerRecord<String, byte[]> record : records){
System.out.println("MySeekToEndConsumer consumer message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
}
} catch (Exception e) {
e.printStackTrace();
}finally{
consumer.close();
}
}
}
消费指定offset消息seek()
consumer.seek
1
2//指定seekPartition和offset
consumer.seek(seekPartition,10);demo
1 |
|