Merge branch 'master' into extended-easysetup
[iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / kafka / KafkaProducerWrapper.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.kafka;
23
24 import java.util.Properties;
25
26 import org.apache.kafka.clients.producer.KafkaProducer;
27 import org.apache.kafka.clients.producer.Producer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.iotivity.cloud.util.Log;
30
31 public class KafkaProducerWrapper {
32
33     private String                   mTopicName = null;
34     private String                   mBroker    = null;
35
36     private Producer<byte[], byte[]> mProducer  = null;
37
38     public KafkaProducerWrapper(String brokerAddress, String topic) {
39
40         mTopicName = topic.replace("/", ".");
41
42         mBroker = brokerAddress;
43
44         mProducer = new KafkaProducer<>(buildPropertiesForPublish());
45     }
46
47     // TODO handle exception
48     public boolean publishMessage(byte[] message) {
49
50         Log.d("kafka publishMessage - " + mTopicName);
51
52         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
53                 mTopicName, message);
54
55         mProducer.send(record);
56         mProducer.flush();
57
58         return true;
59     }
60
61     public void closeConnection() {
62
63         mProducer.close();
64     }
65
66     private Properties buildPropertiesForPublish() {
67
68         // TODO check property settings
69         Properties props = new Properties();
70
71         props.put("bootstrap.servers", mBroker);
72         props.put("acks", "all");
73         props.put("retries", 0);
74         props.put("batch.size", 16384);
75         props.put("linger.ms", 1);
76         props.put("buffer.memory", 33554432);
77         props.put("key.serializer",
78                 "org.apache.kafka.common.serialization.ByteArraySerializer");
79         props.put("value.serializer",
80                 "org.apache.kafka.common.serialization.ByteArraySerializer");
81
82         return props;
83     }
84
85 }