FLink 消费Kafka报Timeout of 60000ms expired before the position for partition
代码
我只是想从每个分区的开始位置消费,但是却等来了无情的 :
flink_uv_0 Timeout of 60000ms expired before the position for partition
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("flink_uv", kafkaDeserializationSchema, new Properties());Map<KafkaTopicPartition, Long> partitionLongMap = new HashMap<>();partitionLongMap.put(new KafkaTopicPartition("flink_uv", 0), 0L);partitionLongMap.put(new KafkaTopicPartition("flink_uv", 1), 0L);partitionLongMap.put(new KafkaTopicPartition("flink_uv", 2), 0L);flinkKafkaConsumer.setStartFromSpecificOffsets(partitionLongMap);DataStreamSource<String> source1 = executionEnvironment.addSource(flinkKafkaConsumer)
问题排查:
百度查找说是由于消费者数量和分区数不相等导致的;但是我设置
executionEnvironment.setParallelism(3);
也没有解决问题;
最后我通过
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);List<PartitionInfo> partitions= kafkaConsumer.partitionsFor("flink_uv");// kafka分区描述器public class PartitionInfo {private final String topic;private final int partition;private final Node leader;private final Node[] replicas;private final Node[] inSyncReplicas;private final Node[] offlineReplicas;}
获取kafka集群的分区情况partitions, 在这里面确实是3个分区,但是 flink_uv 0号分区的leader为null;并且irs(inSyncReplicas)也为null;
我又去zk上查看zk的kafka注册节点,/brokers/ids/下只有[1, 3](应该是[1, 2, 3]),看来是2号机器的Kafka宕机了,我随即重启了2号机器的Kafka机器,Flink报错消失;
问题复盘:
我的topic “flink_uv” 是3个分区,但是只有1个副本,如果这时候Leader宕机的话,那就无副本可以充当Leader了,无法提供服务;这也是为什么多副本是高可用的原因吧;
恒生云融内推企鹅邮件:1160782779@
如果觉得《Flink Timeout of 60000ms expired before the position for partition》对你有帮助,请点赞、收藏,并留下你的观点哦!