03Producer

Producer

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

image-20201027234214723

依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>same with kafka</version>
</dependency>

SimpleProducer

public class SimpleProducer {
    public static void main(String[] args) {
        //创建kafka配置信息
        Properties properties = new Properties();
        //指定连接kafka集群
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
        //ack应答级别 -1/all 0 1
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //批次大小 16k   字符串key都可以通过ProducerConfig获取到 也可以自定义,重写过滤器分区器时可获得
        properties.put("batch.size", 16384);
        //等待时间 1ms
        properties.put("linger.ms", 1);
        //16k或1ms就会提交到RecordAccumulator

        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        //key序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value序列化
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 10; i++) {
            //该发送为异步发送 
            //send返回为future  
            //.get()即可变成同步
            producer.send(new ProducerRecord<String, String>("hello","producer1"+i));
        }
       producer.close();
    }
}

CallBackProducer

/**
有回调的send
*/
public class CallBackProducer {
    public static void main(String[] args) {

        //创建kafka配置信息
        Properties properties = new Properties();
        //指定连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
        //ack应答级别 -1/all 0 1
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //批次大小 16k
        properties.put("batch.size", 16384);
        //等待时间 1ms
        properties.put("linger.ms", 1);
        //16k或1ms就会提交到RecordAccumulator

        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //创建producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            //send 返回值为future  .get()即同步发送
            producer.send(new ProducerRecord<String, String>("hello","key", "producer2" + i), (recordMetadata, e) -> {
                if(e==null){
                    System.out.println(finalI);
                    System.out.println(recordMetadata.partition());
                    System.out.println(recordMetadata.topic());
                    System.out.println(recordMetadata);
                    System.out.println("-----------");
                }else{
                    e.printStackTrace();
                }
            });
        }
        producer.close();

    }
}

PartitionProducer

/**
 * 自定义发送到哪个分区 
 * 通过 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"priv.king.partitioner.MyPartitioner");
 * 默认是key Hash(不完全)
 **/
public class PartitionProducer {
    public static void main(String[] args) {
        //创建kafka配置信息
        Properties properties = new Properties();
        //指定连接kafka集群
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
        //ack应答级别 -1/all 0 1
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //批次大小 16k
        properties.put("batch.size", 16384);
        //等待时间 1ms
        properties.put("linger.ms", 1);
        //16k或1ms就会提交到RecordAccumulator

        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"priv.king.partitioner.MyPartitioner");

        //创建producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            producer.send(new ProducerRecord<String, String>("hello", "producer2" + i), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e==null){
                        System.out.println(finalI);
                        System.out.println(recordMetadata.partition());
                        System.out.println(recordMetadata.topic());
                        System.out.println(recordMetadata.partition());
                        System.out.println("-----------");
                    }else{
                        e.printStackTrace();
                    }


                }
            });
        }
        producer.close();
    }
}



public class MyPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //cluster.partitionCountForTopic(topic);
        //cluster.partitionsForTopic(topic);
        return 0;
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {
        System.out.println(configs);

        //configs   在producer中传入的Properties
    }
}

InterceptorProducer

/**
 * 自定义过滤器
 * 通过 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("priv.king.interceptor.TimeInterceptor"));
 * 
 **/
public class InterceptorProducer {
    public static void main(String[] args) {

        //创建kafka配置信息
        Properties properties = new Properties();
        //指定连接kafka集群
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        //ack应答级别 -1/all 0 1
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //批次大小 16k
        properties.put("batch.size", 16384);
        //等待时间 1ms
        properties.put("linger.ms", 1);
        //16k或1ms就会提交到RecordAccumulator

        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("priv.king.interceptor.TimeInterceptor"));
        //创建producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("hello", "producer1" + i));
        }
        producer.close();

    }
}


public class TimeInterceptor implements ProducerInterceptor<String,String> {

    Long countSuccess=0L;
    Long countError=0L;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String value = record.value();
        value+= ","+System.currentTimeMillis();

        ProducerRecord<String, String> record1 = new ProducerRecord<>(record.topic(), record.key(), value);
        return record1;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //响应ack时调用
        if(exception==null){
            countSuccess++;
        }else{
            countError++;
        }
    }

    @Override
    public void close() {
        //This is called when interceptor is closed
        //在producer.close()是调用
        System.out.printf("success:%d,error:%d\n",countSuccess,countError);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

03Producer
https://jiajun.xyz/2020/10/29/java/kafka/03Producer/
作者
Lambda
发布于
2020年10月29日
许可协议