1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
public class InterceptorProducer { public static void main(String[] args) {
Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, 3); properties.put("batch.size", 16384); properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("priv.king.interceptor.TimeInterceptor")); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("hello", "producer1" + i)); } producer.close();
} }
public class TimeInterceptor implements ProducerInterceptor<String,String> {
Long countSuccess=0L; Long countError=0L; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { String value = record.value(); value+= ","+System.currentTimeMillis();
ProducerRecord<String, String> record1 = new ProducerRecord<>(record.topic(), record.key(), value); return record1; }
@Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if(exception==null){ countSuccess++; }else{ countError++; } }
@Override public void close() { System.out.printf("success:%d,error:%d\n",countSuccess,countError); }
@Override public void configure(Map<String, ?> configs) {
} }
|