博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
记一次 Kafka 排错
阅读量:6480 次
发布时间:2019-06-23

本文共 12479 字,大约阅读时间需要 41 分钟。

环境信息

CentOS 7.3

Kafka 使用默认配置, 单独启动 Zookeeper , 不使用自带的 zk ,
Kafka 和 Zookeeper 在同一台主机上, 均为单节点

问题现象

使用 kafka 测试队列正常, Java 代码无法正常接收队列消息

相关代码

 pom.xml

org.springframework.boot
spring-boot-starter-parent
1.5.6.RELEASE
org.springframework.boot
spring-boot-starter
org.apache.kafka
kafka-clients
0.11.0.0
org.springframework.kafka
spring-kafka
1.2.2.RELEASE

application.properties

spring.kafka.consumer.group-id=junbaor-test-groupspring.kafka.bootstrap-servers=10.4.82.141:9092

App.java

package com.example.demo;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.kafka.annotation.KafkaListener;@SpringBootApplicationpublic class App {    public static void main(String[] args) {        SpringApplication.run(App.class, args);    }    @KafkaListener(topics = "junbaor-test")    public void test(String s) {        System.out.println(s);    }}

日志

.   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::        (v1.5.6.RELEASE)2017-09-05 14:56:50.971  INFO 52968 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication on Junbaor-PC with PID 52968 (D:\Project\github\demo\target\classes started by junbaor in D:\Project\github\demo)2017-09-05 14:56:50.973  INFO 52968 --- [           main] com.example.demo.DemoApplication         : No active profile set, falling back to default profiles: default2017-09-05 14:56:51.023  INFO 52968 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6bd61f98: startup date [Tue Sep 05 14:56:51 CST 2017]; root of context hierarchy2017-09-05 14:56:51.463  INFO 52968 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$a05e7a75] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2017-09-05 14:56:51.714  INFO 52968 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup2017-09-05 14:56:51.746  INFO 52968 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 02017-09-05 14:56:51.763  INFO 52968 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:     auto.commit.interval.ms = 5000    auto.offset.reset = latest    bootstrap.servers = [10.4.82.141:9092]    check.crcs = true    client.id =     connections.max.idle.ms = 540000    enable.auto.commit = true    exclude.internal.topics = true    fetch.max.bytes = 52428800    fetch.max.wait.ms = 500    fetch.min.bytes = 1    group.id = junbaor-test-group    heartbeat.interval.ms = 3000    interceptor.classes = null    internal.leave.group.on.close = true    isolation.level = read_uncommitted    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer    max.partition.fetch.bytes = 1048576    max.poll.interval.ms = 300000    max.poll.records = 500    metadata.max.age.ms = 300000    metric.reporters = []    metrics.num.samples = 2    metrics.recording.level = INFO    metrics.sample.window.ms = 30000    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]    receive.buffer.bytes = 65536    reconnect.backoff.max.ms = 1000    reconnect.backoff.ms = 50    request.timeout.ms = 305000    retry.backoff.ms = 100    sasl.jaas.config = null    sasl.kerberos.kinit.cmd = /usr/bin/kinit    sasl.kerberos.min.time.before.relogin = 60000    sasl.kerberos.service.name = null    sasl.kerberos.ticket.renew.jitter = 0.05    sasl.kerberos.ticket.renew.window.factor = 0.8    sasl.mechanism = GSSAPI    security.protocol = PLAINTEXT    send.buffer.bytes = 131072    session.timeout.ms = 10000    ssl.cipher.suites = null    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]    ssl.endpoint.identification.algorithm = null    ssl.key.password = null    ssl.keymanager.algorithm = SunX509    ssl.keystore.location = null    ssl.keystore.password = null    ssl.keystore.type = JKS    ssl.protocol = TLS    ssl.provider = null    ssl.secure.random.implementation = null    ssl.trustmanager.algorithm = PKIX    ssl.truststore.location = null    ssl.truststore.password = null    ssl.truststore.type = JKS    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer2017-09-05 14:56:51.822  INFO 52968 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.02017-09-05 14:56:51.822  INFO 52968 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f2017-09-05 14:56:59.155  INFO 52968 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 8.466 seconds (JVM running for 10.586)2017-09-05 14:56:59.259  INFO 52968 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator wkfg-1:9092 (id: 2147483647 rack: null) for group junbaor-test-group.2017-09-05 14:57:06.036  INFO 52968 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator wkfg-1:9092 (id: 2147483647 rack: null) dead for group junbaor-test-group

最后一行

Marking the coordinator wkfg-1:9092 (id: 2147483647 rack: null) dead for group junbaor-test-group

被标记为死亡, 不能接收消息的原因可能就是消费者死亡导致的

分析过程

从 log 可以分析这句是 AbstractCoordinator 类打印的, 我们找到打印这行信息的代码

clipboard.png

原因是 this.coordinator != null, 打上断点看一下 coordinator 是什么东东

clipboard.png

wkfg-1 是 Kafka 实例所在服务器的主机名,

9092 是 kafka 的端口,这玩意好像是 Kafka 的连接地址
乍一看, 觉得没什么问题 (其实问题就出在这里)


this.coordinator 是什么时候赋值的呢, 往上追踪, 找到上一个调用的方法

点击调用栈中的上一个方法

clipboard.png

跳到了这里

clipboard.png

既然 coordinator 不为空, 那进入代码块一定是因为 client.connectionFailed(coordinator)

从语义分析是因为客户端连接 coordinator 失败
不管什么原因引起的, 先点进去再说

clipboard.png

看到形参名是 node, 这个对象应该就是 kafka 的节点信息,

点开查看一下对象的具体属性

clipboard.png

问题定位

注意看上图的 host 属性, host 的意思一般是主机.

局域网内, 通过主机名是无法访问的。
一般是通过 IP 、域名、或者修改 hosts 文件把主机名和 IP 对应起来
定位后,我们尝试用最简单的方法解决问题.

尝试解决

上面几个方案实施起来最简单的就是修改本机 hosts 文件

windows 系统 hosts 文件位于 C:\Windows\System32\drivers\etc\hosts

使用管理员权限打开, 追加 IP 和 主机名对应关系

10.4.82.141       wkfg-1

再次启动项目, 日志如下

.   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::        (v1.5.6.RELEASE)2017-09-05 16:06:45.862  INFO 53000 --- [           main] com.example.demo.App                     : Starting App on Junbaor-PC with PID 53000 (D:\Project\github\demo\target\classes started by junbaor in D:\Project\github\demo)2017-09-05 16:06:45.867  INFO 53000 --- [           main] com.example.demo.App                     : No active profile set, falling back to default profiles: default2017-09-05 16:06:45.963  INFO 53000 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@52b1beb6: startup date [Tue Sep 05 16:06:45 CST 2017]; root of context hierarchy2017-09-05 16:06:46.838  INFO 53000 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2436eacd] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2017-09-05 16:06:47.184  INFO 53000 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup2017-09-05 16:06:47.248  INFO 53000 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 02017-09-05 16:06:47.308  INFO 53000 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:     auto.commit.interval.ms = 5000    auto.offset.reset = latest    bootstrap.servers = [10.4.82.141:9092]    check.crcs = true    client.id =     connections.max.idle.ms = 540000    enable.auto.commit = true    exclude.internal.topics = true    fetch.max.bytes = 52428800    fetch.max.wait.ms = 500    fetch.min.bytes = 1    group.id = junbaor-test-group    heartbeat.interval.ms = 3000    interceptor.classes = null    internal.leave.group.on.close = true    isolation.level = read_uncommitted    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer    max.partition.fetch.bytes = 1048576    max.poll.interval.ms = 300000    max.poll.records = 500    metadata.max.age.ms = 300000    metric.reporters = []    metrics.num.samples = 2    metrics.recording.level = INFO    metrics.sample.window.ms = 30000    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]    receive.buffer.bytes = 65536    reconnect.backoff.max.ms = 1000    reconnect.backoff.ms = 50    request.timeout.ms = 305000    retry.backoff.ms = 100    sasl.jaas.config = null    sasl.kerberos.kinit.cmd = /usr/bin/kinit    sasl.kerberos.min.time.before.relogin = 60000    sasl.kerberos.service.name = null    sasl.kerberos.ticket.renew.jitter = 0.05    sasl.kerberos.ticket.renew.window.factor = 0.8    sasl.mechanism = GSSAPI    security.protocol = PLAINTEXT    send.buffer.bytes = 131072    session.timeout.ms = 10000    ssl.cipher.suites = null    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]    ssl.endpoint.identification.algorithm = null    ssl.key.password = null    ssl.keymanager.algorithm = SunX509    ssl.keystore.location = null    ssl.keystore.password = null    ssl.keystore.type = JKS    ssl.protocol = TLS    ssl.provider = null    ssl.secure.random.implementation = null    ssl.trustmanager.algorithm = PKIX    ssl.truststore.location = null    ssl.truststore.password = null    ssl.truststore.type = JKS    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer2017-09-05 16:06:47.412  INFO 53000 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.02017-09-05 16:06:47.413  INFO 53000 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f2017-09-05 16:06:47.432  INFO 53000 --- [           main] com.example.demo.App                     : Started App in 1.927 seconds (JVM running for 2.774)2017-09-05 16:06:47.519  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator wkfg-1:9092 (id: 2147483647 rack: null) for group junbaor-test-group.2017-09-05 16:06:47.525  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group junbaor-test-group2017-09-05 16:06:47.525  INFO 53000 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]2017-09-05 16:06:47.526  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group junbaor-test-group2017-09-05 16:06:47.765  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group junbaor-test-group with generation 12017-09-05 16:06:47.766  INFO 53000 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [junbaor-test-0, junbaor-test-1, junbaor-test-2] for group junbaor-test-group2017-09-05 16:06:47.767  INFO 53000 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[junbaor-test-0, junbaor-test-1, junbaor-test-2]

log 显示已加入 group , 分区已经分配

使用 Kafka 自带的命令往 Topic 发一条消息试试能否收到

[junbaor@wkfg-1 bin]$ ./kafka-console-producer.sh --topic junbaor-test --broker-list 127.0.0.1:9092>test>

clipboard.png

至此, 问题解决

方案总结

可能因为没有给 Kafka 设置监听地址导致的默认监听主机名

clipboard.png

在配置中果然搜索到类似选项, 按照注释的意思就是会广播给消费者和生产者的地址.

我们按照要求改成 advertised.listeners=PLAINTEXT://10.4.82.141:9092
恢复本机 hosts 文件经测试同样解决了问题

知识点

我们在 application.properties 中已经指定 spring.kafka.bootstrap-servers 为 IP, 为什么还会使用主机名链接呢?

推测客户端是先连接到 Kafka 实例后会从 zk 中获取配置

然后客户端 watch zk 节点得到配置地址后才开始监听队列。

clipboard.png

转载地址:http://aiwuo.baihongyu.com/

你可能感兴趣的文章
ssh登陆不需要密码
查看>>
java mkdir()和mkdirs()区别
查看>>
虚拟化--003 vcac licence -成功案例
查看>>
windows server 2003各版本及2008各版本的最大识别内存大小
查看>>
OSChina 周六乱弹 ——揭秘后羿怎么死的
查看>>
IT人员的职业生涯规划
查看>>
sorry,you must have a tty to run sudo
查看>>
ios开发中使用正则表达式识别处理字符串中的URL
查看>>
项目中的积累,及常见小问题
查看>>
Python类型转换、数值操作(收藏)
查看>>
oracle11g dataguard 安装手册(转)
查看>>
1. Two Sum - Easy - Leetcode解题报告
查看>>
多线程---同步函数的锁是this(转载)
查看>>
鱼C记事本V1.0(下)- 零基础入门学习Delphi28
查看>>
百练 2742 统计字符数 解题报告
查看>>
Ubuntu搜狗输入法候选词乱码
查看>>
js中回调函数写法
查看>>
React native android 最常见的10个问题
查看>>
数据结构和算法
查看>>
.Net 项目代码风格要求
查看>>