java发送kafka事务消息

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

java发送kafka事务消息

逆风飞翔的小叔   2022-09-28 我要评论

前言

事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可

kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API

// 1 初始化事务
void initTransactions();


// 2 开启事务
void beginTransaction() throws ProducerFencedException;


// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;


// 4 提交事务
void commitTransaction() throws ProducerFencedException;


// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

下面结合实际的代码以及效果演示进行说明

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class ProducerTransaction {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
 
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
 
        // 初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        System.out.println("开始发送消息");
        try {
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                // 发送消息
                kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
            }
            //int i = 1 / 0;
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(e);
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
 
}

运行上面的代码,正常是可以发送到指定的topic下

接下来,我们将上面的代码中的 1/0 放开,再次运行程序,可以看到,程序中抛异常了,但是消息并没有发送到kafka的broker,说明事务的配置生效了

 到此这篇关于java发送kafka事务消息的实现方法的文章就介绍到这了,更多相关java发送kafka事务消息内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们