a32f7d7ff7ba1bf1e62db65d52278cedcaa972b6
[iotivity.git] / service / notification / src / provider / NSProviderTopic.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSProviderTopic.h"
22 #include "oic_string.h"
23 #include "oic_malloc.h"
24 #include <pthread.h>
25
26 NSResult NSSendTopicUpdation();
27
28 NSResult NSInitTopicList()
29 {
30     NS_LOG(DEBUG, "NSInitTopicList - IN");
31
32     consumerTopicList = NSProviderStorageCreate();
33     NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
34     consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
35
36     registeredTopicList = NSProviderStorageCreate();
37     NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
38     registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
39
40     NS_LOG(DEBUG, "NSInitTopicList - OUT");
41     return NS_OK;
42 }
43
44 size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
45 {
46     if (!firstElement)
47     {
48         return 0;
49     }
50
51     int cnt = 0;
52
53     NSTopicLL * iter = firstElement;
54
55     while (iter)
56     {
57         cnt++;
58         iter = iter->next;
59     }
60
61     return cnt;
62 }
63
64 NSResult NSRegisterTopic(const char * topicName)
65 {
66     NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
67
68     NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
69     NS_VERIFY_NOT_NULL(data, NS_FAIL);
70     data->topicName = (char *) topicName;
71     data->state = NS_TOPIC_UNSUBSCRIBED;
72
73     NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
74     if (!element)
75     {
76         NSOICFree(data->topicName);
77         NSOICFree(data);
78         return NS_FAIL;
79     }
80
81     element->data = (void *) data;
82     element->next = NULL;
83
84     if (NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
85     {
86         NS_LOG(DEBUG, "fail to write cache");
87         return NS_FAIL;
88     }
89
90     NSSendTopicUpdation();
91     NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
92     return NS_OK;
93 }
94
95 NSResult NSUnregisterTopic(const char * topicName)
96 {
97     NS_LOG(DEBUG, "NSDeleteTopics()");
98     NSResult result = NS_OK;
99
100     if (!topicName)
101     {
102         NS_LOG(ERROR, "topicName is NULL");
103         return NS_ERROR;
104     }
105
106     result = NSProviderStorageDelete(registeredTopicList, topicName);
107
108     while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
109     {
110     }
111
112     if (result == NS_OK)
113     {
114         NSSendTopicUpdation();
115     }
116
117     return result;
118 }
119
120 NSResult NSSendTopicUpdation()
121 {
122     NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
123
124     OCRepPayload* payload = OCRepPayloadCreate();
125
126     if (!payload)
127     {
128         NS_LOG(ERROR, "fail to create playload");
129         return NS_ERROR;
130     }
131
132     OCResourceHandle rHandle = NULL;
133     if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
134     {
135         NS_LOG(ERROR, "Fail to put message resource");
136         OCRepPayloadDestroy(payload);
137         return NS_ERROR;
138     }
139
140     OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
141     OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
142     OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
143
144     OCObservationId obArray[255] =
145     { 0, };
146     size_t obCount = 0;
147
148     NSCacheElement * it = consumerSubList->head;
149
150     while (it)
151     {
152         NSCacheSubData * subData = (NSCacheSubData *) it->data;
153
154         if (subData->isWhite)
155         {
156             if (subData->messageObId != 0)
157             {
158                 obArray[obCount++] = subData->messageObId;
159             }
160         }
161
162         it = it->next;
163     }
164
165     if (!obCount)
166     {
167         NS_LOG(ERROR, "observer count is zero");
168         OCRepPayloadDestroy(payload);
169         return NS_ERROR;
170     }
171
172     if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
173     {
174         NS_LOG(ERROR, "fail to send topic updation");
175         OCRepPayloadDestroy(payload);
176         return NS_ERROR;
177
178     }
179     OCRepPayloadDestroy(payload);
180
181     NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
182     return NS_OK;
183 }
184
185 NSResult NSSendTopicUpdationToConsumer(char *consumerId)
186 {
187     NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
188
189     OCRepPayload* payload = OCRepPayloadCreate();
190
191     if (!payload)
192     {
193         NS_LOG(ERROR, "fail to create playload");
194         return NS_ERROR;
195     }
196
197     OCResourceHandle rHandle = NULL;
198     if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
199     {
200         NS_LOG(ERROR, "Fail to put message resource");
201         OCRepPayloadDestroy(payload);
202         return NS_ERROR;
203     }
204
205     OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
206     OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
207     OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
208
209     NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
210
211     if (element == NULL)
212     {
213         NS_LOG(ERROR, "element is NULL");
214         OCRepPayloadDestroy(payload);
215         return NS_ERROR;
216     }
217
218     NSCacheSubData * subData = (NSCacheSubData*) element->data;
219
220     if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
221             OC_HIGH_QOS) != OC_STACK_OK)
222     {
223         NS_LOG(ERROR, "fail to send topic updation");
224         OCRepPayloadDestroy(payload);
225         return NS_ERROR;
226     }
227
228     OCRepPayloadDestroy(payload);
229
230     NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
231     return NS_OK;
232 }
233
234 NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
235 {
236     NS_LOG(DEBUG, "NSSendTopicList - IN");
237
238     char * copyReq = OICStrdup(entityHandlerRequest->query);
239     char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
240     NSTopicLL * topics = NULL;
241
242     if (!id)
243     {
244         NS_LOG(DEBUG, "Send registered topic list");
245         topics = NSProviderGetTopicsCacheData(registeredTopicList);
246     }
247     else
248     {
249         NS_LOG(DEBUG, "Send subscribed topic list to consumer");
250         topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
251         if (!topics)
252         {
253             topics = NSProviderGetTopicsCacheData(registeredTopicList);
254         }
255     }
256
257     // make response for the Get Request
258     OCEntityHandlerResponse response;
259     response.numSendVendorSpecificHeaderOptions = 0;
260     memset(response.sendVendorSpecificHeaderOptions, 0,
261             sizeof response.sendVendorSpecificHeaderOptions);
262     memset(response.resourceUri, 0, sizeof response.resourceUri);
263
264     OCRepPayload* payload = OCRepPayloadCreate();
265     if (!payload)
266     {
267         NS_LOG(ERROR, "payload is NULL");
268         NSOICFree(copyReq);
269         return NS_ERROR;
270     }
271
272     OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
273     if (id)
274     {
275         OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
276     }
277     OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
278     NSOICFree(copyReq);
279
280     if (topics)
281     {
282         NS_LOG(DEBUG, "topicList is NULL");
283         size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
284         NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
285
286         if (!dimensionSize)
287         {
288             return NS_ERROR;
289         }
290
291         OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
292                 sizeof(OCRepPayload *) * dimensionSize);
293         NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
294
295         size_t dimensions[3] = { dimensionSize, 0, 0 };
296
297         for (int i = 0; i < (int) dimensionSize; i++)
298         {
299             NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
300             NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
301
302             payloadTopicArray[i] = OCRepPayloadCreate();
303             NS_VERIFY_NOT_NULL(payloadTopicArray[i], NS_ERROR);
304             OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
305                     topics->topicName);
306             OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
307                     (int) topics->state);
308
309             NSTopicLL * next = topics->next;
310             NSOICFree(topics->topicName);
311             NSOICFree(topics);
312             topics = next;
313         }
314
315         OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST,
316                 (const OCRepPayload**) (payloadTopicArray), dimensions);
317         for (int i = 0; i < (int) dimensionSize; ++i)
318         {
319             OCRepPayloadDestroy(payloadTopicArray[i]);
320         }
321         NSOICFree(payloadTopicArray);
322     }
323     else
324     {
325         size_t dimensions[3] = { 0, 0, 0 };
326
327         OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
328                 (OCRepPayload **) NULL, dimensions);
329     }
330
331     copyReq = OICStrdup(entityHandlerRequest->query);
332     char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
333
334     if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
335     {
336         OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
337         OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
338         OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
339     }
340
341     NSOICFree(copyReq);
342     response.requestHandle = entityHandlerRequest->requestHandle;
343     response.resourceHandle = entityHandlerRequest->resource;
344     response.persistentBufferFlag = 0;
345     response.ehResult = OC_EH_OK;
346     response.payload = (OCPayload *) payload;
347
348     if (OCDoResponse(&response) != OC_STACK_OK)
349     {
350         NS_LOG(ERROR, "Fail to response topic list");
351         OCRepPayloadDestroy(payload);
352         return NS_ERROR;
353     }
354
355     OCRepPayloadDestroy(payload);
356     NS_LOG(DEBUG, "NSSendTopicList - OUT");
357     return NS_OK;
358 }
359
360 NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
361 {
362     NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
363
364     char * consumerId = NULL;
365     OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
366     OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
367
368     if (!consumerId)
369     {
370         NS_LOG(DEBUG, "Invalid consumer ID");
371         return NS_FAIL;
372     }
373
374     NS_LOG_V(INFO_PRIVATE, "TOPIC consumer ID = %s", consumerId);
375
376     consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
377
378     while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
379     {
380     }
381
382     consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
383     OCRepPayload ** topicListPayload = NULL;
384     OCRepPayloadValue * payloadValue = NULL;
385     payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
386     size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
387     size_t dimensions[3] = { dimensionSize, 0, 0 };
388     OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
389
390     for (int i = 0; i < (int) dimensionSize; i++)
391     {
392         char * topicName = NULL;
393         int64_t topicState = 0;
394
395         OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
396         OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState);
397         NS_LOG_V(DEBUG, "Topic Name(state):  %s(%d)", topicName, (int)topicState);
398
399         if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
400         {
401             NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
402                     sizeof(NSCacheTopicSubData));
403             NS_VERIFY_NOT_NULL(topicSubData, NS_FAIL);
404
405             OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
406             topicSubData->topicName = topicName;
407
408             NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
409
410             if (!newObj)
411             {
412                 NSOICFree(topicSubData->topicName);
413                 NSOICFree(topicSubData);
414                 NSOICFree(consumerId);
415                 return NS_FAIL;
416             }
417
418             newObj->data = (NSCacheData *) topicSubData;
419             newObj->next = NULL;
420
421             NSProviderStorageWrite(consumerTopicList, newObj);
422         }
423     }
424     NSSendTopicUpdationToConsumer(consumerId);
425     NSOICFree(consumerId);
426     NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
427     return NS_OK;
428 }
429
430 void * NSTopicSchedule(void * ptr)
431 {
432     if (ptr == NULL)
433     {
434         NS_LOG(DEBUG, "Create NSTopicSchedule");
435     }
436
437     while (NSIsRunning[TOPIC_SCHEDULER])
438     {
439         sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
440         pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
441
442         if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
443         {
444             NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
445             NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
446
447             switch (node->taskType)
448             {
449                 case TASK_SEND_TOPICS:
450                     NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
451                     NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
452                     NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
453                     break;
454                 case TASK_SUBSCRIBE_TOPIC:
455                 {
456                     NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
457                     NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
458                     pthread_mutex_lock(topicSyncResult->mutex);
459                     NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
460                     NSCacheTopicSubData * subData =
461                             (NSCacheTopicSubData *) topicSyncResult->topicData;
462                     if (!newObj)
463                     {
464                         NSOICFree(subData->topicName);
465                         NSOICFree(subData);
466                         pthread_cond_signal(topicSyncResult->condition);
467                         pthread_mutex_unlock(topicSyncResult->mutex);
468                     }
469                     else
470                     {
471                         if (NSProviderStorageRead(registeredTopicList, subData->topicName))
472                         {
473                             newObj->data = topicSyncResult->topicData;
474                             newObj->next = NULL;
475
476                             if (NSProviderStorageWrite(consumerTopicList, newObj) == NS_OK)
477                             {
478                                 NSSendTopicUpdationToConsumer(subData->id);
479                                 topicSyncResult->result = NS_OK;
480                             }
481                         }
482                         else
483                         {
484                             NSOICFree(subData->topicName);
485                             NSOICFree(subData);
486                             NSOICFree(newObj);
487                         }
488                     }
489                     pthread_cond_signal(topicSyncResult->condition);
490                     pthread_mutex_unlock(topicSyncResult->mutex);
491                 }
492                     break;
493                 case TASK_UNSUBSCRIBE_TOPIC:
494                 {
495                     NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
496                     NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
497                     pthread_mutex_lock(topicSyncResult->mutex);
498                     NSCacheTopicSubData * topicSubData =
499                             (NSCacheTopicSubData *) topicSyncResult->topicData;
500
501                     if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
502                     {
503                         NSSendTopicUpdationToConsumer(topicSubData->id);
504                         topicSyncResult->result = NS_OK;
505                     }
506
507                     NSOICFree(topicSubData->topicName);
508                     NSOICFree(topicSubData);
509                     pthread_cond_signal(topicSyncResult->condition);
510                     pthread_mutex_unlock(topicSyncResult->mutex);
511
512                 }
513                     break;
514                 case TASK_REGISTER_TOPIC:
515                 {
516                     NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
517                     NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
518
519                     pthread_mutex_lock(topicSyncResult->mutex);
520                     topicSyncResult->result = NSRegisterTopic(
521                             (const char *) topicSyncResult->topicData);
522                     pthread_cond_signal(topicSyncResult->condition);
523                     pthread_mutex_unlock(topicSyncResult->mutex);
524                 }
525                     break;
526                 case TASK_UNREGISTER_TOPIC:
527                 {
528                     NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
529                     NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
530                     pthread_mutex_lock(topicSyncResult->mutex);
531                     topicSyncResult->result = NSUnregisterTopic(
532                             (const char *) topicSyncResult->topicData);
533                     NSOICFree(topicSyncResult->topicData);
534                     pthread_cond_signal(topicSyncResult->condition);
535                     pthread_mutex_unlock(topicSyncResult->mutex);
536                 }
537                     break;
538                 case TASK_POST_TOPIC:
539                 {
540                     NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
541                     NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
542                     NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
543                 }
544                     break;
545                 case TASK_GET_TOPICS:
546                 {
547                     NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
548                     NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
549                     pthread_mutex_lock(topicSync->mutex);
550                     NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
551                     topicSync->topics = topics;
552                     pthread_cond_signal(topicSync->condition);
553                     pthread_mutex_unlock(topicSync->mutex);
554                 }
555                     break;
556                 case TAST_GET_CONSUMER_TOPICS:
557                 {
558                     NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
559                     NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
560                     pthread_mutex_lock(topicSync->mutex);
561                     NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
562                             consumerTopicList, topicSync->consumerId);
563                     topicSync->topics = topics;
564                     pthread_cond_signal(topicSync->condition);
565                     pthread_mutex_unlock(topicSync->mutex);
566                 }
567                     break;
568                 default:
569                     break;
570             }
571
572             NSOICFree(node);
573         }
574
575         pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
576     }
577
578     NS_LOG(DEBUG, "Destroy NSTopicSchedule");
579     return NULL;
580 }