ApacheKafka-Java

ApacheKafka-Java

這篇介紹如何使用Java語言開發Kafka相關功能。

Maven add org.apache.kafka (kafka-clients)

org.apache.kafkakafka-clients2.3.0

Producer Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static String bootstrapServers = "server_xxx:9487";

public static void main(String[] args) {

/**
* create Producer properties
*
* Properties are available in official document:
* https://kafka.apache.org/documentation/#producerconfigs
*
*/
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// create the producer
KafkaProducer<String, String> produer = new KafkaProducer<String, String>(properties);


// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("first_topic", "message from java");

// send data - asynchronous
/**
* asynchronous means the data would not send immediately
* however, the program would terminate immediately after run the send() method
* hence the data would not send to kafka topic
* and the consumer would not receive the data
*
* so we need flush()
*/
produer.send(record);

/**
* use flush() to wait sending complete
*/
produer.flush();
produer.close();

}

有callback的Producer Example:

/**

  • send data with Callback()
    */
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    for(int i = 0; i < 10; i++) {
    // create a producer record
    ProducerRecord<String, String> record =
    new ProducerRecord<String, String>("first_topic", "message from java" + Integer.toString(i));

    produer.send(record, new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    // execute every time a record is successfully sent or an exception is thrown
    if (e == null) {
    // the record is sent successfully
    logger.info("Received new metadata. \n" +
    "Topic: " + recordMetadata.topic() + "\n" +
    "Partition: " + recordMetadata.partition() + "\n" +
    "Offset: " + recordMetadata.offset() + "\n" +
    "Timestamp: " + recordMetadata.timestamp());
    } else {
    LogUtils.logger.error("Error while producing", e);
    }
    }
    });
    }

send ProducerRecord with key:

String key = “id_” + Integer.toString(i);

ProducerRecord<String, String> record =
new ProducerRecord<String, String>(topic, key,”message from java” + Integer.toString(i));

Consumer Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static String bootstrapServers = "server:9092";
private static String groupId = "my-forth-app";
private static String topic = "first_topic";

public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);

/**
* create Consumer properties
*
* Properties are available in official document:
* https://kafka.apache.org/documentation/#consumerconfigs
*
*/
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));

// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100));

for(ConsumerRecord record : records){
logger.info("Key: " + record.key() + "\t" + "Value: " + record.value() +
"Topic: " + record.partition() + "\t" + "Partition: " + record.partition()
);

}
}

}