164baf95ca5994641fa7b7141cda892b2eff0159
[iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / kafka / KafkaProducerWrapper.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.kafka;
23
24 import java.util.Properties;
25
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.iotivity.cloud.util.Log;
30
31 /**
32  *
33  * This class provides a set of APIs to use Kafka producer APIs for publishing
34  * messages.
35  *
36  */
37 public class KafkaProducerWrapper {
38
39     private String                   mTopicName = null;
40     private String                   mBroker    = null;
41
42     private Producer<byte[], byte[]> mProducer  = null;
43
44     public KafkaProducerWrapper(String brokerAddress, String topic) {
45
46         mTopicName = topic.replace("/", ".");
47
48         mBroker = brokerAddress;
49
50         mProducer = new KafkaProducer<>(buildPropertiesForPublish());
51     }
52
53     /**
54      * API to publish message to Kafka topic
55      * 
56      * @param message
57      *            message to publish
58      * 
59      * @return returns true if the message is successfully published, otherwise
60      *         false
61      */
62     public boolean publishMessage(byte[] message) {
63
64         Log.d("kafka publishMessage - " + mTopicName);
65
66         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
67                 mTopicName, message);
68
69         mProducer.send(record);
70         mProducer.flush();
71
72         return true;
73     }
74
75     /**
76      * API to close Kafka producer connection
77      */
78     public void closeConnection() {
79
80         mProducer.close();
81     }
82
83     private Properties buildPropertiesForPublish() {
84
85         // TODO check property settings
86         Properties props = new Properties();
87
88         props.put("bootstrap.servers", mBroker);
89         props.put("acks", "all");
90         props.put("retries", 0);
91         props.put("batch.size", 16384);
92         props.put("linger.ms", 1);
93         props.put("buffer.memory", 33554432);
94         props.put("key.serializer",
95                 "org.apache.kafka.common.serialization.ByteArraySerializer");
96         props.put("value.serializer",
97                 "org.apache.kafka.common.serialization.ByteArraySerializer");
98
99         return props;
100     }
101
102 }