失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Flink Timeout of 60000ms expired before the position for partition

Flink Timeout of 60000ms expired before the position for partition

时间:2020-02-25 20:59:48

相关推荐

Flink Timeout of 60000ms expired before the position for partition

FLink 消费Kafka报Timeout of 60000ms expired before the position for partition

代码

我只是想从每个分区的开始位置消费,但是却等来了无情的 :

flink_uv_0 Timeout of 60000ms expired before the position for partition

FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("flink_uv", kafkaDeserializationSchema, new Properties());Map<KafkaTopicPartition, Long> partitionLongMap = new HashMap<>();partitionLongMap.put(new KafkaTopicPartition("flink_uv", 0), 0L);partitionLongMap.put(new KafkaTopicPartition("flink_uv", 1), 0L);partitionLongMap.put(new KafkaTopicPartition("flink_uv", 2), 0L);flinkKafkaConsumer.setStartFromSpecificOffsets(partitionLongMap);DataStreamSource<String> source1 = executionEnvironment.addSource(flinkKafkaConsumer)

问题排查:

百度查找说是由于消费者数量和分区数不相等导致的;但是我设置

executionEnvironment.setParallelism(3);

也没有解决问题;

最后我通过

KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);List<PartitionInfo> partitions= kafkaConsumer.partitionsFor("flink_uv");// kafka分区描述器public class PartitionInfo {private final String topic;private final int partition;private final Node leader;private final Node[] replicas;private final Node[] inSyncReplicas;private final Node[] offlineReplicas;}

获取kafka集群的分区情况partitions, 在这里面确实是3个分区,但是 flink_uv 0号分区的leader为null;并且irs(inSyncReplicas)也为null;

我又去zk上查看zk的kafka注册节点,/brokers/ids/下只有[1, 3](应该是[1, 2, 3]),看来是2号机器的Kafka宕机了,我随即重启了2号机器的Kafka机器,Flink报错消失;

问题复盘:

我的topic “flink_uv” 是3个分区,但是只有1个副本,如果这时候Leader宕机的话,那就无副本可以充当Leader了,无法提供服务;这也是为什么多副本是高可用的原因吧;

恒生云融内推企鹅邮件:1160782779@

如果觉得《Flink Timeout of 60000ms expired before the position for partition》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。