d70deebce7de1007600f358441bf94c03f97a6e3
[iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / topic / Topic.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.topic;
23
24 import java.util.ArrayList;
25 import java.util.HashMap;
26
27 import org.iotivity.cloud.base.device.Device;
28 import org.iotivity.cloud.base.exception.ServerException.ForbiddenException;
29 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
30 import org.iotivity.cloud.base.exception.ServerException.NotFoundException;
31 import org.iotivity.cloud.base.exception.ServerException.PreconditionFailedException;
32 import org.iotivity.cloud.base.protocols.IRequest;
33 import org.iotivity.cloud.base.protocols.IResponse;
34 import org.iotivity.cloud.base.protocols.MessageBuilder;
35 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
36 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
37 import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
38 import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
39 import org.iotivity.cloud.util.Cbor;
40
41 /**
42  * 
43  * This class provides a set of APIs to handle requests to MessageQueue Topic.
44  *
45  */
46 public class Topic {
47
48     private TopicManager           mTopicManager = null;
49
50     private String                 mName         = null;
51     private String                 mType         = null;
52     private HashMap<String, Topic> mSubtopics    = null;
53
54     private byte[]                 mLatestData   = null;
55
56     private class TopicSubscriber {
57         TopicSubscriber(Device subscriber, IRequest request) {
58             mSubscriber = subscriber;
59             mRequest = request;
60         }
61
62         public Device   mSubscriber;
63         public IRequest mRequest;
64     }
65
66     private HashMap<String, TopicSubscriber> mSubscribers           = null;
67
68     private KafkaProducerWrapper             mKafkaProducerOperator = null;
69     private KafkaConsumerWrapper             mKafkaConsumerOperator = null;
70
71     Cbor<HashMap<String, Object>>            mCbor                  = new Cbor<>();
72
73     public Topic(String name, String type, TopicManager topicManager) {
74
75         mTopicManager = topicManager;
76         mName = name;
77         mType = type;
78
79         mSubtopics = new HashMap<>();
80         mSubscribers = new HashMap<>();
81
82         String kafka_zookeeper = topicManager.getKafkaZookeeper();
83         String kafka_broker = topicManager.getKafkaBroker();
84
85         mKafkaProducerOperator = new KafkaProducerWrapper(kafka_broker, name);
86         mKafkaConsumerOperator = new KafkaConsumerWrapper(kafka_zookeeper,
87                 kafka_broker, this);
88
89         HashMap<String, Object> data = new HashMap<>();
90         mLatestData = mCbor.encodingPayloadToCbor(data);
91     }
92
93     /**
94      * API to get name of the topic
95      * 
96      * @return name of the topic
97      */
98     public String getName() {
99         return mName;
100     }
101
102     /**
103      * API to get type of the topic
104      * 
105      * @return type of the topic
106      */
107     public String getType() {
108         return mType;
109     }
110
111     /**
112      * API to handle request to create subtopic
113      * 
114      * @param request
115      *            received request for subtopic creation
116      * 
117      * @return response of subtopic creation
118      */
119     public IResponse handleCreateSubtopic(IRequest request) {
120
121         String newTopicName = request.getUriPathSegments()
122                 .get(request.getUriPathSegments().size() - 1);
123
124         String newTopicType = new String();
125
126         if (request.getUriQueryMap() != null) {
127             newTopicType = request.getUriQueryMap().get("rt").get(0);
128         }
129
130         if (getSubtopic(newTopicName) != null) {
131             throw new ForbiddenException("topic already exist");
132         }
133
134         Topic newTopic = new Topic(mName + "/" + newTopicName, newTopicType,
135                 mTopicManager);
136
137         if (mTopicManager.createTopic(newTopic) == false) {
138             throw new InternalServerErrorException("create topic falied");
139         }
140
141         synchronized (mSubtopics) {
142             mSubtopics.put(newTopicName, newTopic);
143         }
144
145         IResponse response = MessageBuilder.createResponse(request,
146                 ResponseStatus.CREATED);
147         response.setLocationPath(request.getUriPath());
148         return response;
149     }
150
151     /**
152      * API to handle request to remove subtopic
153      * 
154      * @param request
155      *            received request for subtopic removal
156      * @param topicName
157      *            subtopic name to remove
158      * 
159      * @return response of subtopic removal
160      */
161     public IResponse handleRemoveSubtopic(IRequest request, String topicName) {
162
163         Topic targetTopic = getSubtopic(topicName);
164
165         if (targetTopic == null) {
166             throw new NotFoundException("topic doesn't exist");
167         }
168
169         targetTopic.cleanup();
170
171         if (mTopicManager.removeTopic(targetTopic) == false) {
172             throw new InternalServerErrorException("remove topic failed");
173         }
174
175         synchronized (mSubtopics) {
176             mSubtopics.remove(topicName);
177         }
178
179         return MessageBuilder.createResponse(request, ResponseStatus.DELETED);
180     }
181
182     /**
183      * API to handle request to subscribe the topic
184      * 
185      * @param srcDevice
186      *            device that sent request for topic subscription
187      * @param request
188      *            received request for topic subscription
189      * 
190      * @return response of topic subscription
191      */
192     public IResponse handleSubscribeTopic(Device srcDevice, IRequest request) {
193
194         // get latest data from kafka if consumer started for the first time
195         if (mKafkaConsumerOperator.consumerStarted() == false) {
196
197             ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
198
199             if (data != null && !data.isEmpty()) {
200                 mLatestData = data.get(data.size() - 1);
201             }
202         }
203
204         if (mKafkaConsumerOperator.subscribeTopic() == false) {
205             throw new InternalServerErrorException("subscribe topic failed");
206         }
207
208         synchronized (mSubscribers) {
209             mSubscribers.put(request.getRequestId(),
210                     new TopicSubscriber(srcDevice, request));
211         }
212
213         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
214                 ContentFormat.APPLICATION_CBOR, mLatestData);
215     }
216
217     /**
218      * API to handle request to unsubscribe the topic
219      * 
220      * @param request
221      *            received request for topic unsubscription
222      * 
223      * @return response of topic unsubscription
224      */
225     public IResponse handleUnsubscribeTopic(IRequest request) {
226
227         synchronized (mSubscribers) {
228
229             TopicSubscriber subscriber = mSubscribers
230                     .get(request.getRequestId());
231
232             mSubscribers.remove(subscriber.mRequest.getRequestId());
233
234             // if there's no more subscriber, stop subscribing topic
235             // with kafka consumer
236             if (mSubscribers.isEmpty()) {
237                 mKafkaConsumerOperator.unsubscribeTopic();
238             }
239         }
240
241         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
242                 ContentFormat.APPLICATION_CBOR, mLatestData);
243     }
244
245     /**
246      * API to handle request to publish message to the topic
247      * 
248      * @param request
249      *            received request for message publication
250      * 
251      * @return response of message publication
252      */
253     public IResponse handlePublishMessage(IRequest request) {
254         byte[] payload = request.getPayload();
255
256         if (payload == null) {
257             throw new PreconditionFailedException("payload is null");
258         }
259
260         HashMap<String, Object> message = mCbor.parsePayloadFromCbor(payload,
261                 HashMap.class);
262
263         if (message == null || message.isEmpty()) {
264             throw new PreconditionFailedException("message is not included");
265         }
266
267         if (mKafkaProducerOperator.publishMessage(payload) == false) {
268             throw new InternalServerErrorException("publish message failed");
269         }
270
271         return MessageBuilder.createResponse(request, ResponseStatus.CHANGED);
272     }
273
274     /**
275      * API to handle request to read latest message in the topic
276      * 
277      * @param request
278      *            received request for reading latest message in topic
279      * 
280      * @return response of reading latest message in topic
281      */
282     public IResponse handleReadMessage(IRequest request) {
283         // if consumer is not started, get data from kafka broker
284         if (mKafkaConsumerOperator.consumerStarted() == false) {
285
286             ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
287
288             if (data != null && !data.isEmpty()) {
289                 mLatestData = data.get(data.size() - 1);
290             }
291         }
292
293         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
294                 ContentFormat.APPLICATION_CBOR, mLatestData);
295     }
296
297     /**
298      * API to close connection of Kafka producer and consumer
299      */
300     public void cleanup() {
301
302         mKafkaProducerOperator.closeConnection();
303         mKafkaConsumerOperator.closeConnection();
304     }
305
306     /**
307      * callback from Kafka Consumer to get published message
308      * 
309      * @param message
310      *            published message
311      */
312     public void onMessagePublished(byte[] message) {
313
314         mLatestData = message;
315
316         notifyPublishedMessage();
317     }
318
319     private Topic getSubtopic(String topicName) {
320
321         Topic topic = null;
322
323         synchronized (mSubtopics) {
324             topic = mSubtopics.get(topicName);
325         }
326
327         return topic;
328     }
329
330     private void notifyPublishedMessage() {
331         synchronized (mSubscribers) {
332             for (TopicSubscriber subscriber : mSubscribers.values()) {
333
334                 subscriber.mSubscriber.sendResponse(
335                         MessageBuilder.createResponse(subscriber.mRequest,
336                                 ResponseStatus.CONTENT,
337                                 ContentFormat.APPLICATION_CBOR, mLatestData));
338             }
339         }
340     }
341 }