IOT-3143
[iotivity.git] / cloud / interface / src / main / java / org / iotivity / cloud / ciserver / DeviceServerSystem.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.ciserver;
23
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.iotivity.cloud.base.OICConstants;
31 import org.iotivity.cloud.base.ServerSystem;
32 import org.iotivity.cloud.base.connector.ConnectorPool;
33 import org.iotivity.cloud.base.device.CoapDevice;
34 import org.iotivity.cloud.base.device.Device;
35 import org.iotivity.cloud.base.device.IRequestChannel;
36 import org.iotivity.cloud.base.exception.ClientException;
37 import org.iotivity.cloud.base.exception.ServerException;
38 import org.iotivity.cloud.base.exception.ServerException.BadOptionException;
39 import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
40 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
41 import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException;
42 import org.iotivity.cloud.base.protocols.MessageBuilder;
43 import org.iotivity.cloud.base.protocols.coap.CoapRequest;
44 import org.iotivity.cloud.base.protocols.coap.CoapResponse;
45 import org.iotivity.cloud.base.protocols.coap.CoapSignaling;
46 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
47 import org.iotivity.cloud.base.protocols.enums.RequestMethod;
48 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
49 import org.iotivity.cloud.base.protocols.enums.SignalingMethod;
50 import org.iotivity.cloud.base.server.CoapServer;
51 import org.iotivity.cloud.base.server.HttpServer;
52 import org.iotivity.cloud.base.server.Server;
53 import org.iotivity.cloud.base.server.WebSocketServer;
54 import org.iotivity.cloud.util.Bytes;
55 import org.iotivity.cloud.util.Cbor;
56
57 import io.netty.channel.ChannelDuplexHandler;
58 import io.netty.channel.ChannelHandler.Sharable;
59 import io.netty.channel.ChannelHandlerContext;
60 import io.netty.channel.ChannelInboundHandlerAdapter;
61 import io.netty.channel.ChannelPromise;
62
63 /**
64  *
65  * This class provides a set of APIs to manage all of request
66  *
67  */
68
69 public class DeviceServerSystem extends ServerSystem {
70     private final static Logger                           Log     = LoggerFactory.getLogger(DeviceServerSystem.class);
71     private Cbor<HashMap<String, Object>>                 mCbor   = new Cbor<HashMap<String, Object>>();
72     private HashMap<ChannelHandlerContext, CoapSignaling> mCsmMap = new HashMap<>();
73
74     /**
75      *
76      * This class provides a set of APIs to manage device pool.
77      *
78      */
79     public static class CoapDevicePool {
80         HashMap<String, Device> mMapDevice = new HashMap<>();
81
82         /**
83          * API for adding device information into pool.
84          * 
85          * @param device
86          *            device to be added
87          */
88         public void addDevice(Device device) {
89             String deviceId = ((CoapDevice) device).getDeviceId();
90             synchronized (mMapDevice) {
91                 mMapDevice.put(deviceId, device);
92             }
93         }
94
95         /**
96          * API for removing device information into pool.
97          * 
98          * @param device
99          *            device to be removed
100          */
101         public void removeDevice(Device device) throws ClientException {
102             String deviceId = ((CoapDevice) device).getDeviceId();
103             synchronized (mMapDevice) {
104                 if (mMapDevice.get(deviceId) == device) {
105                     mMapDevice.remove(deviceId);
106                 }
107             }
108             removeObserveDevice(device);
109         }
110
111         private void removeObserveDevice(Device device) throws ClientException {
112             synchronized (mMapDevice) {
113                 Iterator<String> iterator = mMapDevice.keySet().iterator();
114                 while (iterator.hasNext()) {
115                     String deviceId = iterator.next();
116                     CoapDevice getDevice = (CoapDevice) queryDevice(deviceId);
117                     getDevice.removeObserveChannel(
118                             ((CoapDevice) device).getRequestChannel());
119                 }
120             }
121         }
122
123         /**
124          * API for getting device information.
125          * 
126          * @param deviceId
127          *            device id to get device
128          */
129         public Device queryDevice(String deviceId) {
130             Device device = null;
131             synchronized (mMapDevice) {
132                 device = mMapDevice.get(deviceId);
133             }
134             return device;
135         }
136     }
137
138     CoapDevicePool mDevicePool = new CoapDevicePool();
139
140     /**
141      *
142      * This class provides a set of APIs to manage life cycle of coap message.
143      *
144      */
145     @Sharable
146     class CoapLifecycleHandler extends ChannelDuplexHandler {
147         @Override
148         public void channelRead(ChannelHandlerContext ctx, Object msg) {
149
150             if (msg instanceof CoapRequest) {
151                 try {
152                     CoapDevice coapDevice = (CoapDevice) ctx.channel()
153                             .attr(keyDevice).get();
154
155                     if (coapDevice.isExpiredTime()) {
156                         throw new UnAuthorizedException("token is expired");
157                     }
158
159                     CoapRequest coapRequest = (CoapRequest) msg;
160                     IRequestChannel targetChannel = null;
161                     String urlPath = coapRequest.getUriPath();
162
163                     if (urlPath == null) {
164                         throw new InternalServerErrorException(
165                                 "request uriPath is null");
166                     }
167
168                     if (urlPath.contains(Constants.ROUTE_FULL_URI)) {
169
170                         int RouteResourcePathSize = Constants.ROUTE_FULL_URI
171                                 .split("/").length;
172                         List<String> uriPath = coapRequest.getUriPathSegments();
173                         if (uriPath != null && !uriPath.isEmpty()) {
174                             CoapDevice targetDevice = (CoapDevice) mDevicePool
175                                     .queryDevice(uriPath
176                                             .get(RouteResourcePathSize - 1));
177                             targetChannel = targetDevice.getRequestChannel();
178                         }
179
180                         switch (coapRequest.getObserve()) {
181                             case SUBSCRIBE:
182                                 coapDevice.addObserveRequest(
183                                         Bytes.bytesToLong(
184                                                 coapRequest.getToken()),
185                                         coapRequest);
186                                 coapDevice.addObserveChannel(targetChannel);
187                                 break;
188                             case UNSUBSCRIBE:
189                                 coapDevice.removeObserveChannel(targetChannel);
190                                 coapDevice.removeObserveRequest(Bytes
191                                         .bytesToLong(coapRequest.getToken()));
192                                 break;
193                             default:
194                                 break;
195                         }
196                     }
197                 } catch (Throwable t) {
198                     Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
199                     ResponseStatus responseStatus = t instanceof ServerException
200                             ? ((ServerException) t).getErrorResponse()
201                             : ResponseStatus.INTERNAL_SERVER_ERROR;
202                     ctx.writeAndFlush(MessageBuilder
203                             .createResponse((CoapRequest) msg, responseStatus));
204                     ctx.close();
205                 }
206             }
207             ctx.fireChannelRead(msg);
208         }
209
210         @Override
211         public void write(ChannelHandlerContext ctx, Object msg,
212                 ChannelPromise promise) throws Exception {
213
214             boolean bCloseConnection = false;
215
216             if (msg instanceof CoapResponse) {
217                 // This is CoapResponse
218                 // Once the response is valid, add this to deviceList
219                 CoapResponse response = (CoapResponse) msg;
220                 if (!response.getStatus().isSuccess()) {
221                     ctx.writeAndFlush(msg);
222                     return;
223                 }
224
225                 String urlPath = response.getUriPath();
226                 if (urlPath == null) {
227                     throw new InternalServerErrorException(
228                             "request uriPath is null");
229                 }
230
231                 switch (urlPath) {
232                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
233                         if (response.getStatus() != ResponseStatus.CHANGED) {
234                             bCloseConnection = true;
235                         }
236                         break;
237                     case OICConstants.ACCOUNT_FULL_URI:
238                         if (response.getStatus() == ResponseStatus.DELETED) {
239                             bCloseConnection = true;
240                         }
241                         break;
242                 }
243             }
244
245             ctx.writeAndFlush(msg);
246
247             if (bCloseConnection == true) {
248                 ctx.close();
249             }
250         }
251
252         @Override
253         public void channelActive(ChannelHandlerContext ctx) {
254             Device device = ctx.channel().attr(keyDevice).get();
255             // Authenticated device connected
256             Log.debug("Device: {} online", device.getDeviceId());
257             try {
258                 sendDevicePresence(device.getDeviceId(), "on");
259             } catch (ServerException.ServiceUnavailableException e) {
260                 Log.warn(e.getMessage());
261                 ctx.close();
262             }
263             mDevicePool.addDevice(device);
264             device.onConnected();
265         }
266
267         @Override
268         public void channelInactive(ChannelHandlerContext ctx)
269                 throws ClientException {
270             Device device = ctx.channel().attr(keyDevice).get();
271
272             // Some cases, this event occurs after new device connected using
273             // same di.
274             // So compare actual value, and remove if same.
275             if (device != null) {
276                 Log.debug("Device: {} offline ", device.getDeviceId());
277                 try {
278                     sendDevicePresence(device.getDeviceId(), "off");
279                 } catch (ServerException.ServiceUnavailableException e) {
280                     Log.warn(e.getMessage());
281                     ctx.close();
282                 } finally {
283                     device.onDisconnected();
284                     mDevicePool.removeDevice(device);
285                     ctx.channel().attr(keyDevice).remove();
286                 }
287             }
288         }
289
290         /**
291          * API for sending state to resource directory
292          * 
293          * @param deviceId
294          *            device id to be sent to resource directory
295          * @param state
296          *            device state to be sent to resource directory
297          */
298         public void sendDevicePresence(String deviceId, String state) {
299
300             Cbor<HashMap<String, Object>> cbor = new Cbor<>();
301             HashMap<String, Object> payload = new HashMap<String, Object>();
302             payload.put(Constants.REQ_DEVICE_ID, deviceId);
303             payload.put(Constants.PRESENCE_STATE, state);
304             StringBuffer uriPath = new StringBuffer();
305             uriPath.append("/" + Constants.PREFIX_OIC);
306             uriPath.append("/" + Constants.DEVICE_PRESENCE_URI);
307             ConnectorPool.getConnection("rd")
308                     .sendRequest(MessageBuilder.createRequest(
309                             RequestMethod.POST, uriPath.toString(), null,
310                             ContentFormat.APPLICATION_CBOR,
311                             cbor.encodingPayloadToCbor(payload)), null);
312         }
313     }
314
315     CoapLifecycleHandler mLifeCycleHandler = new CoapLifecycleHandler();
316
317     @Sharable
318     class CoapAuthHandler extends ChannelDuplexHandler {
319
320         @Override
321         public void channelActive(ChannelHandlerContext ctx) {
322             // Actual channel active should decided after authentication.
323         }
324
325         @Override
326         public void write(ChannelHandlerContext ctx, Object msg,
327                 ChannelPromise promise) {
328             try {
329
330                 if (!(msg instanceof CoapResponse)) {
331                     // throw new BadRequestException(
332                     // "this msg type is not CoapResponse");
333
334                     // TODO check websocket handshake response
335                     ctx.writeAndFlush(msg);
336                     return;
337                 }
338                 // This is CoapResponse
339                 // Once the response is valid, add this to deviceList
340
341                 CoapResponse response = (CoapResponse) msg;
342                 if (!response.getStatus().isSuccess()) {
343                     ctx.writeAndFlush(msg);
344                     return;
345                 }
346
347                 String urlPath = response.getUriPath();
348                 if (urlPath == null) {
349                     throw new InternalServerErrorException(
350                             "request uriPath is null");
351                 }
352                 switch (urlPath) {
353                     /*
354                      * case OICConstants.ACCOUNT_FULL_URI:
355                      * ctx.writeAndFlush(msg); ctx.close(); return;
356                      */
357
358                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
359                         HashMap<String, Object> payloadData = mCbor
360                                 .parsePayloadFromCbor(response.getPayload(),
361                                         HashMap.class);
362
363                         if (response.getStatus() != ResponseStatus.CHANGED) {
364                             throw new UnAuthorizedException();
365                         }
366
367                         if (payloadData == null) {
368                             throw new BadRequestException("payload is empty");
369                         }
370                         int remainTime = (int) payloadData
371                                 .get(Constants.EXPIRES_IN);
372
373                         Device device = ctx.channel().attr(keyDevice).get();
374                         ((CoapDevice) device).setExpiredPolicy(remainTime);
375
376                         // Remove current auth handler and replace to
377                         // LifeCycleHandle
378                         ctx.channel().pipeline().replace(this,
379                                 "LifeCycleHandler", mLifeCycleHandler);
380
381                         // Raise event that we have Authenticated device
382                         ctx.fireChannelActive();
383
384                         break;
385                 }
386
387                 ctx.writeAndFlush(msg);
388
389             } catch (Throwable t) {
390                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
391                 ctx.writeAndFlush(msg);
392                 ctx.close();
393             }
394         }
395
396         @Override
397         public void channelRead(ChannelHandlerContext ctx, Object msg) {
398             try {
399                 if (!(msg instanceof CoapRequest)) {
400                     throw new BadRequestException(
401                             "this msg type is not CoapRequest");
402                 }
403
404                 // And check first response is VALID then add or cut
405                 CoapRequest request = (CoapRequest) msg;
406
407                 String urlPath = request.getUriPath();
408
409                 if (urlPath == null) {
410                     throw new InternalServerErrorException(
411                             "request uriPath is null");
412                 }
413
414                 switch (urlPath) {
415                     // Check whether request is about account
416                     case OICConstants.ACCOUNT_FULL_URI:
417                     case OICConstants.ACCOUNT_TOKENREFRESH_FULL_URI:
418
419                         if (ctx.channel().attr(keyDevice).get() == null) {
420                             // Create device first and pass to upperlayer
421                             Device device = new CoapDevice(ctx);
422                             ctx.channel().attr(keyDevice).set(device);
423                         }
424
425                         break;
426
427                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
428
429                         HashMap<String, Object> authPayload = mCbor
430                                 .parsePayloadFromCbor(request.getPayload(),
431                                         HashMap.class);
432
433                         Device device = ctx.channel().attr(keyDevice).get();
434
435                         if (device == null) {
436                             device = new CoapDevice(ctx);
437                             ctx.channel().attr(keyDevice).set(device);
438                         }
439
440                         if (authPayload == null) {
441                             throw new BadRequestException("payload is empty");
442                         }
443
444                         ((CoapDevice) device).updateDevice(
445                                 (String) authPayload.get(Constants.DEVICE_ID),
446                                 (String) authPayload.get(Constants.USER_ID),
447                                 (String) authPayload
448                                         .get(Constants.ACCESS_TOKEN));
449
450                         break;
451
452                     case OICConstants.KEEP_ALIVE_FULL_URI:
453                         // TODO: Pass ping request to upper layer
454                         break;
455
456                     default:
457                         throw new UnAuthorizedException(
458                                 "authentication required first");
459                 }
460
461                 ctx.fireChannelRead(msg);
462
463             } catch (Throwable t) {
464                 ResponseStatus responseStatus = t instanceof ServerException
465                         ? ((ServerException) t).getErrorResponse()
466                         : ResponseStatus.UNAUTHORIZED;
467                 ctx.writeAndFlush(MessageBuilder
468                         .createResponse((CoapRequest) msg, responseStatus));
469                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
470             }
471         }
472     }
473
474     @Sharable
475     static class HttpAuthHandler extends ChannelDuplexHandler {
476         @Override
477         public void channelActive(ChannelHandlerContext ctx) throws Exception {
478             // After current channel authenticated, raise to upper layer
479         }
480     }
481
482     @Sharable
483     class CoapSignalingHandler extends ChannelInboundHandlerAdapter {
484
485         @Override
486         public void channelInactive(ChannelHandlerContext ctx)
487                 throws Exception {
488             // delete csm information from the map
489             mCsmMap.remove(ctx);
490             ctx.fireChannelInactive();
491         }
492
493         @Override
494         public void channelRead(ChannelHandlerContext ctx, Object msg) {
495             try {
496                 if (msg instanceof CoapSignaling) {
497                     if (mCsmMap.get(ctx) == null) {
498                         // In the server, the CSM message is sent to the device
499                         // once
500                         CoapSignaling inicialCsm = (CoapSignaling) MessageBuilder
501                                 .createSignaling(SignalingMethod.CSM);
502                         inicialCsm.setCsmMaxMessageSize(4294967295L);
503                         ctx.writeAndFlush(inicialCsm);
504                     }
505                     CoapSignaling signaling = (CoapSignaling) msg;
506                     switch (signaling.getSignalingMethod()) {
507                         case CSM:
508                             // get existing CSM from the map
509                             CoapSignaling existingCsm = mCsmMap.get(ctx);
510                             if (existingCsm == null) {
511                                 existingCsm = signaling;
512                             } else {
513                                 // replace and cumulate CSM options
514                                 existingCsm.setCsmBlockWiseTransfer(
515                                         signaling.getCsmBlockWiseTransfer());
516                                 existingCsm.setCsmMaxMessageSize(
517                                         signaling.getCsmMaxMessageSize());
518                                 existingCsm.setCsmServerName(
519                                         signaling.getCsmServerName());
520                             }
521                             mCsmMap.put(ctx, existingCsm);
522                             break;
523                         case PING:
524                             // TODO process PING signaling option
525                             break;
526                         case PONG:
527                             // TODO process PONG signaling option
528                             break;
529                         case RELEASE:
530                         case ABORT:
531                             mCsmMap.remove(ctx);
532                             ctx.close();
533                             break;
534                         default:
535                             throw new BadOptionException(
536                                     "unsupported CoAP Signaling option");
537                     }
538
539                     ctx.fireChannelRead(msg);
540                 } else {
541                     ctx.fireChannelRead(msg);
542                     // TODO annotated codes must be removed to follow
543                     // the CSM specification of draft-ietf-core-coap-tcp-tls-05
544
545                     // if (mCsmMap.get(ctx) != null) {
546                     // ctx.fireChannelRead(msg);
547                     // } else {
548                     // // send ABORT signaling and close the connection
549                     // ctx.writeAndFlush(MessageBuilder.createSignaling(
550                     // SignalingMethod.ABORT,
551                     // new String(
552                     // "Capability and Settings message (CSM) is not received
553                     // yet")
554                     // .getBytes()));
555                     // ctx.close();
556                     // }
557                 }
558             } catch (Throwable t) {
559                 ResponseStatus responseStatus = t instanceof ServerException
560                         ? ((ServerException) t).getErrorResponse()
561                         : ResponseStatus.BAD_OPTION;
562                 if (msg instanceof CoapRequest) {
563                     ctx.writeAndFlush(MessageBuilder
564                             .createResponse((CoapRequest) msg, responseStatus));
565                 } else if (msg instanceof CoapSignaling) {
566                     ctx.writeAndFlush(MessageBuilder.createSignalingResponse(
567                             (CoapSignaling) msg, responseStatus));
568                 }
569                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
570             }
571         }
572
573     }
574
575     @Override
576     public void addServer(Server server) {
577         if (server instanceof CoapServer) {
578             server.addHandler(new CoapSignalingHandler());
579             server.addHandler(new CoapAuthHandler());
580         }
581
582         if (server instanceof WebSocketServer) {
583             server.addHandler(new CoapAuthHandler());
584         }
585
586         if (server instanceof HttpServer) {
587             server.addHandler(new HttpAuthHandler());
588         }
589
590         super.addServer(server);
591     }
592
593     public CoapDevicePool getDevicePool() {
594         return mDevicePool;
595     }
596 }