spring 整合kafka监听消费

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

spring 整合kafka监听消费

蜗牛学编程   2021-03-04 我要评论

前言

最近项目里有个需求,要消费kafka里的数据。之前也手动写过代码去消费kafka数据。但是转念一想。既然spring提供了消费kafka的方法。就没必要再去重复造轮子。于是尝试使用spring的API。

项目技术背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。

整合过程

  1. 引入spring-kafka的依赖包

      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

2. 在spring的xml文件里增加配置项,也可以单独创建一个spring-context-XX.xml文件。

 <!-- consumer configuration  该配置项可以根据自己业务的实际需求做增加或删除-->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="group.id" value="group" />
                <entry key="enable.auto.commit" value="true" />
                <entry key="auto.commit.interval.ms" value="3000" />
                <entry key="session.timeout.ms" value="10000" />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- create factory  该类是spring jar包里提供,就这么配置-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <!-- 自定义的消费类,需要实现spring的接口 -->
    <bean id="payPalConsumer"
          class="com.chao.service.consumer.PayPalConsumer" />

    <!-- 该类也是jar包里提供的,注入的监听类是自己定义的,topic名称是配置文件引入的-->
    <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics" value="${kafka.paypal.topic.name}"/>
        <property name="messageListener" ref="payPalConsumer" />
    </bean>

    <!-- 改类也是jar里提供的,把这个containerProperties和consumerfactory 注入 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>

 2. 自定义消费者类,消费者类依然可以使用注解。

/**
 * get msg from kafka
 */
@Component 
public class PayPalConsumer implements MessageListener<String, String> {

    private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);
    @Autowired
    private XXService XXService;
    @Override
    public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
        String value = authorizeRecord.value();
        if (StringUtils.isEmpty(value)){
            logger.warn("receive message from kafka is null");
            return;
        }
        logger.info("receive message from kafka is {}",value);
    }
}

使用这个步骤配置,一次性过。非常顺利。

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们