73e1e5007d71fd19da6499922d0459ab563add32
[iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / kafka / KafkaConsumerWrapper.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.kafka;
23
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32
33 import kafka.admin.AdminUtils;
34 import kafka.api.FetchRequest;
35 import kafka.api.FetchRequestBuilder;
36 import kafka.api.PartitionOffsetRequestInfo;
37 import kafka.common.TopicAndPartition;
38 import kafka.consumer.Consumer;
39 import kafka.consumer.ConsumerConfig;
40 import kafka.consumer.KafkaStream;
41 import kafka.javaapi.FetchResponse;
42 import kafka.javaapi.OffsetResponse;
43 import kafka.javaapi.consumer.ConsumerConnector;
44 import kafka.javaapi.consumer.SimpleConsumer;
45 import kafka.javaapi.message.ByteBufferMessageSet;
46 import kafka.message.MessageAndMetadata;
47 import kafka.message.MessageAndOffset;
48 import kafka.utils.ZKStringSerializer$;
49 import kafka.utils.ZkUtils;
50
51 import org.I0Itec.zkclient.ZkClient;
52 import org.I0Itec.zkclient.ZkConnection;
53 import org.iotivity.cloud.mqserver.Constants;
54 import org.iotivity.cloud.mqserver.topic.Topic;
55 import org.iotivity.cloud.util.Log;
56
57 /**
58  *
59  * This class provides a set of APIs to use Kafka consumer APIs for receiving
60  * messages.
61  *
62  */
63 public class KafkaConsumerWrapper {
64
65     private String            mTopicName         = null;
66
67     private String            mZookeeper         = null;
68     private String            mBroker            = null;
69
70     private ZkClient          mZkClient          = null;
71     private ZkUtils           mZkUtils           = null;
72
73     private ConsumerConnector mConsumerConnector = null;
74     private ExecutorService   mConsumerExecutor  = null;
75
76     private Topic             mInternalConsumer  = null;
77
78     private boolean           mConsumerStarted   = false;
79
80     public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress,
81             Topic consumer) {
82
83         mTopicName = consumer.getName().replace("/", ".");
84
85         mZookeeper = zookeeperAddress;
86         mBroker = brokerAddress;
87         mInternalConsumer = consumer;
88
89         mZkClient = new ZkClient(zookeeperAddress,
90                 Constants.KAFKA_SESSION_TIMEOUT,
91                 Constants.KAFKA_CONNECT_TIMEOUT, ZKStringSerializer$.MODULE$);
92
93         mZkUtils = new ZkUtils(mZkClient, new ZkConnection(zookeeperAddress),
94                 false);
95     }
96
97     /**
98      * API to check if Kafka consumer is started
99      * 
100      * @return returns true if Kafka consumer started, otherwise false
101      */
102     public boolean consumerStarted() {
103         return mConsumerStarted;
104     }
105
106     /**
107      * API to subscribe Kafka topic to receive messages
108      * 
109      * @return returns true if the topic is successfully subscribed, otherwise
110      *         false
111      */
112     public boolean subscribeTopic() {
113
114         Log.d("kafka subscribeTopic - " + mTopicName);
115
116         if (mConsumerStarted == true) {
117             return true;
118         }
119
120         // remove consumer group info if already exist
121         List<String> subscribers = mZkClient.getChildren(ZkUtils
122                 .ConsumersPath());
123
124         if (subscribers.contains(mTopicName)) {
125             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
126         }
127
128         ConsumerConfig consumerConfig = new ConsumerConfig(
129                 buildPropertiesForSubscribe());
130
131         mConsumerConnector = Consumer
132                 .createJavaConsumerConnector(consumerConfig);
133
134         Map<String, Integer> topicCountMap = new HashMap<>();
135         topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
136
137         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mConsumerConnector
138                 .createMessageStreams(topicCountMap);
139
140         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(mTopicName);
141
142         mConsumerExecutor = Executors
143                 .newFixedThreadPool(Constants.KAFKA_CONSUMMER_THREADS);
144
145         for (final KafkaStream<byte[], byte[]> stream : streams) {
146
147             Log.d("kafka subscribe complete");
148
149             mConsumerExecutor.execute(new Runnable() {
150
151                 public void run() {
152
153                     for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
154
155                         mInternalConsumer.onMessagePublished(messageAndMetadata
156                                 .message());
157                     }
158                 }
159             });
160         }
161
162         mConsumerStarted = true;
163
164         return true;
165     }
166
167     /**
168      * API to unsubscribe Kafka topic to stop receiving messages
169      * 
170      * @return returns true if the topic is successfully unsubscribed, otherwise
171      *         false
172      */
173     public boolean unsubscribeTopic() {
174
175         Log.d("kafka unsubscribeTopic - " + mTopicName);
176
177         // remove consumer group info in zookeeper
178         List<String> subscribers = mZkClient.getChildren(ZkUtils
179                 .ConsumersPath());
180
181         if (subscribers.contains(mTopicName)) {
182             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
183         }
184
185         mConsumerExecutor.shutdown();
186         mConsumerConnector.shutdown();
187
188         mConsumerStarted = false;
189
190         return true;
191     }
192
193     /**
194      * API to close Kafka consumer connection
195      */
196     public void closeConnection() {
197
198         if (mConsumerStarted == true) {
199             unsubscribeTopic();
200         }
201
202         mZkUtils.close();
203         mZkClient.close();
204     }
205
206     /**
207      * API to get all messages from Kafka topic
208      * 
209      * @return returns the list of messages published to the topic
210      */
211     public ArrayList<byte[]> getMessages() {
212
213         Log.d("kafka get all messages - " + mTopicName);
214
215         String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
216         int brokerPort = Integer.parseInt(mBroker.substring(mBroker
217                 .indexOf(':') + 1));
218
219         Log.d("host " + brokerHost + ", port " + brokerPort);
220
221         // TODO check options - Timeout: Int, bufferSize: Int
222         SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost,
223                 brokerPort, 100000, 64 * 1024, mTopicName);
224
225         long lastOffset = getLastOffset(simpleConsumer, mTopicName, 0,
226                 kafka.api.OffsetRequest.EarliestTime(), mTopicName);
227
228         // TODO check option - fetch size
229         FetchRequest req = new FetchRequestBuilder().clientId(mTopicName)
230                 .addFetch(mTopicName, 0, lastOffset, 100000).build();
231
232         FetchResponse fetchResponse = simpleConsumer.fetch(req);
233
234         if (fetchResponse == null || fetchResponse.hasError()) {
235
236             Log.e("Error fetching data from the Broker");
237             return null;
238         }
239
240         ArrayList<byte[]> initialData = new ArrayList<>();
241
242         ByteBufferMessageSet messageSet = fetchResponse.messageSet(mTopicName,
243                 0);
244
245         if (messageSet != null) {
246             for (MessageAndOffset messageAndOffset : messageSet) {
247
248                 long currentOffset = messageAndOffset.offset();
249                 if (currentOffset < lastOffset) {
250                     Log.e("Found an old offset: " + currentOffset
251                             + " Expecting: " + lastOffset);
252                     continue;
253                 }
254
255                 lastOffset = messageAndOffset.nextOffset();
256                 ByteBuffer payload = messageAndOffset.message().payload();
257
258                 byte[] bytes = new byte[payload.limit()];
259                 payload.get(bytes);
260
261                 initialData.add(bytes);
262             }
263         }
264
265         simpleConsumer.close();
266
267         Log.d("kafka get all messages complete");
268
269         return initialData;
270     }
271
272     private Properties buildPropertiesForSubscribe() {
273
274         // TODO check property settings
275         Properties props = new Properties();
276
277         props.put("group.id", mTopicName);
278         props.put("zookeeper.connect", mZookeeper);
279         props.put("enable.auto.commit", "true");
280         props.put("auto.commit.interval.ms", Constants.KAFKA_COMMIT_INTERVAL);
281
282         return props;
283     }
284
285     private long getLastOffset(SimpleConsumer consumer, String topic,
286             int partition, long whichTime, String clientName) {
287
288         TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
289                 partition);
290
291         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
292         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
293                 whichTime, 1));
294
295         kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
296                 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
297                 clientName);
298
299         OffsetResponse response = consumer.getOffsetsBefore(request);
300
301         if (response == null || response.hasError()) {
302             Log.e("Error fetching data Offset Data the Broker");
303             return 0;
304         }
305
306         long[] offsets = response.offsets(topic, partition);
307         return offsets[0];
308     }
309
310 }