/** * create Producer properties * * Properties are available in official document: * https://kafka.apache.org/documentation/#producerconfigs * */ Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// create the producer KafkaProducer<String, String> produer = new KafkaProducer<String, String>(properties);
// create a producer record ProducerRecord<String, String> record = new ProducerRecord<String, String>("first_topic", "message from java");
// send data - asynchronous /** * asynchronous means the data would not send immediately * however, the program would terminate immediately after run the send() method * hence the data would not send to kafka topic * and the consumer would not receive the data * * so we need flush() */ produer.send(record);
/** * use flush() to wait sending complete */ produer.flush(); produer.close();
for(int i = 0; i < 10; i++) { // create a producer record ProducerRecord<String, String> record = new ProducerRecord<String, String>("first_topic", "message from java" + Integer.toString(i));
produer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { // execute every time a record is successfully sent or an exception is thrown if (e == null) { // the record is sent successfully logger.info("Received new metadata. \n" + "Topic: " + recordMetadata.topic() + "\n" + "Partition: " + recordMetadata.partition() + "\n" + "Offset: " + recordMetadata.offset() + "\n" + "Timestamp: " + recordMetadata.timestamp()); } else { LogUtils.logger.error("Error while producing", e); } } }); }
send ProducerRecord with key:
String key = “id_” + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key,”message from java” + Integer.toString(i));