kafka(2).zip
立即下载
资源介绍:
kafka(2).zip
package com.gansu.kafka.producter;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducterCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();
// 2. 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.102:9092");
//关联添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.gansu.kafka.producter.MyPartitioner");
// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 3. 创建kafka生产者对象
KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
// 4. 调用send方法,发送消息
for (int i = 0; i < 3; i++) {
// 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)
// 依次指定key值为a,b,f ,数据key的hash值与3个分区求余,分别发往1、2、0
kafkaProducer.send(new ProducerRecord("first","root001"+i)
// 添加回调
, new Callback(){
// 该方法在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null){
// 没有异常,输出信息到控制台
System.out.println("主题是"+metadata.topic()+",分区是"+metadata.partition());
}else{
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
// Thread.sleep(10);
}
// 5. 关闭资源
kafkaProducer.close();
}
}