03Producer

Producer

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

image-20201027234214723

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>same with kafka</version>
</dependency>

SimpleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SimpleProducer {
public static void main(String[] args) {
//创建kafka配置信息
Properties properties = new Properties();
//指定连接kafka集群
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
//ack应答级别 -1/all 0 1
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//批次大小 16k 字符串key都可以通过ProducerConfig获取到 也可以自定义,重写过滤器分区器时可获得
properties.put("batch.size", 16384);
//等待时间 1ms
properties.put("linger.ms", 1);
//16k或1ms就会提交到RecordAccumulator

//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory", 33554432);
//key序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value序列化
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//创建producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 10; i++) {
//该发送为异步发送
//send返回为future
//.get()即可变成同步
producer.send(new ProducerRecord<String, String>("hello","producer1"+i));
}
producer.close();
}
}

CallBackProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
有回调的send
*/
public class CallBackProducer {
public static void main(String[] args) {

//创建kafka配置信息
Properties properties = new Properties();
//指定连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
//ack应答级别 -1/all 0 1
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//批次大小 16k
properties.put("batch.size", 16384);
//等待时间 1ms
properties.put("linger.ms", 1);
//16k或1ms就会提交到RecordAccumulator

//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 10; i++) {
final int finalI = i;
//send 返回值为future .get()即同步发送
producer.send(new ProducerRecord<String, String>("hello","key", "producer2" + i), (recordMetadata, e) -> {
if(e==null){
System.out.println(finalI);
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata);
System.out.println("-----------");
}else{
e.printStackTrace();
}
});
}
producer.close();

}
}

PartitionProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 自定义发送到哪个分区
* 通过 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"priv.king.partitioner.MyPartitioner");
* 默认是key Hash(不完全)
**/
public class PartitionProducer {
public static void main(String[] args) {
//创建kafka配置信息
Properties properties = new Properties();
//指定连接kafka集群
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
//ack应答级别 -1/all 0 1
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//批次大小 16k
properties.put("batch.size", 16384);
//等待时间 1ms
properties.put("linger.ms", 1);
//16k或1ms就会提交到RecordAccumulator

//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"priv.king.partitioner.MyPartitioner");

//创建producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 10; i++) {
final int finalI = i;
producer.send(new ProducerRecord<String, String>("hello", "producer2" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println(finalI);
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println("-----------");
}else{
e.printStackTrace();
}


}
});
}
producer.close();
}
}



public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//cluster.partitionCountForTopic(topic);
//cluster.partitionsForTopic(topic);
return 0;
}

public void close() {

}

public void configure(Map<String, ?> configs) {
System.out.println(configs);

//configs 在producer中传入的Properties
}
}

InterceptorProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 自定义过滤器
* 通过 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("priv.king.interceptor.TimeInterceptor"));
*
**/
public class InterceptorProducer {
public static void main(String[] args) {

//创建kafka配置信息
Properties properties = new Properties();
//指定连接kafka集群
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
//ack应答级别 -1/all 0 1
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//批次大小 16k
properties.put("batch.size", 16384);
//等待时间 1ms
properties.put("linger.ms", 1);
//16k或1ms就会提交到RecordAccumulator

//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("priv.king.interceptor.TimeInterceptor"));
//创建producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("hello", "producer1" + i));
}
producer.close();

}
}


public class TimeInterceptor implements ProducerInterceptor<String,String> {

Long countSuccess=0L;
Long countError=0L;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String value = record.value();
value+= ","+System.currentTimeMillis();

ProducerRecord<String, String> record1 = new ProducerRecord<>(record.topic(), record.key(), value);
return record1;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
//响应ack时调用
if(exception==null){
countSuccess++;
}else{
countError++;
}
}

@Override
public void close() {
//This is called when interceptor is closed
//在producer.close()是调用
System.out.printf("success:%d,error:%d\n",countSuccess,countError);
}

@Override
public void configure(Map<String, ?> configs) {

}
}

03Producer
https://jiajun.xyz/2020/10/29/java/kafka/03Producer/
作者
Lambda
发布于
2020年10月29日
许可协议