b59331a0ba6c4e1f923bfe557f4a2daa41ce1cfc
[iotivity.git] / cloud / interface / src / main / java / org / iotivity / cloud / ciserver / DeviceServerSystem.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.ciserver;
23
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.iotivity.cloud.base.OICConstants;
31 import org.iotivity.cloud.base.ServerSystem;
32 import org.iotivity.cloud.base.connector.ConnectorPool;
33 import org.iotivity.cloud.base.device.CoapDevice;
34 import org.iotivity.cloud.base.device.Device;
35 import org.iotivity.cloud.base.device.IRequestChannel;
36 import org.iotivity.cloud.base.exception.ClientException;
37 import org.iotivity.cloud.base.exception.ServerException;
38 import org.iotivity.cloud.base.exception.ServerException.BadOptionException;
39 import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
40 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
41 import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException;
42 import org.iotivity.cloud.base.protocols.MessageBuilder;
43 import org.iotivity.cloud.base.protocols.coap.CoapRequest;
44 import org.iotivity.cloud.base.protocols.coap.CoapResponse;
45 import org.iotivity.cloud.base.protocols.coap.CoapSignaling;
46 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
47 import org.iotivity.cloud.base.protocols.enums.RequestMethod;
48 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
49 import org.iotivity.cloud.base.protocols.enums.SignalingMethod;
50 import org.iotivity.cloud.base.server.CoapServer;
51 import org.iotivity.cloud.base.server.HttpServer;
52 import org.iotivity.cloud.base.server.Server;
53 import org.iotivity.cloud.base.server.WebSocketServer;
54 import org.iotivity.cloud.util.Bytes;
55 import org.iotivity.cloud.util.Cbor;
56
57 import io.netty.channel.ChannelDuplexHandler;
58 import io.netty.channel.ChannelHandler.Sharable;
59 import io.netty.channel.ChannelHandlerContext;
60 import io.netty.channel.ChannelInboundHandlerAdapter;
61 import io.netty.channel.ChannelPromise;
62
63 /**
64  *
65  * This class provides a set of APIs to manage all of request
66  *
67  */
68
69 public class DeviceServerSystem extends ServerSystem {
70     private final static Logger                           Log     = LoggerFactory.getLogger(DeviceServerSystem.class);
71     private Cbor<HashMap<String, Object>>                 mCbor   = new Cbor<HashMap<String, Object>>();
72     private HashMap<ChannelHandlerContext, CoapSignaling> mCsmMap = new HashMap<>();
73
74     /**
75      *
76      * This class provides a set of APIs to manage device pool.
77      *
78      */
79     public static class CoapDevicePool {
80         HashMap<String, Device> mMapDevice = new HashMap<>();
81
82         /**
83          * API for adding device information into pool.
84          * 
85          * @param device
86          *            device to be added
87          */
88         public void addDevice(Device device) {
89             String deviceId = ((CoapDevice) device).getDeviceId();
90             synchronized (mMapDevice) {
91                 mMapDevice.put(deviceId, device);
92             }
93         }
94
95         /**
96          * API for removing device information into pool.
97          * 
98          * @param device
99          *            device to be removed
100          */
101         public void removeDevice(Device device) throws ClientException {
102             String deviceId = ((CoapDevice) device).getDeviceId();
103             synchronized (mMapDevice) {
104                 if (mMapDevice.get(deviceId) == device) {
105                     mMapDevice.remove(deviceId);
106                 }
107             }
108             removeObserveDevice(device);
109         }
110
111         private void removeObserveDevice(Device device) throws ClientException {
112             synchronized (mMapDevice) {
113                 Iterator<String> iterator = mMapDevice.keySet().iterator();
114                 while (iterator.hasNext()) {
115                     String deviceId = iterator.next();
116                     CoapDevice getDevice = (CoapDevice) queryDevice(deviceId);
117                     getDevice.removeObserveChannel(
118                             ((CoapDevice) device).getRequestChannel());
119                 }
120             }
121         }
122
123         /**
124          * API for getting device information.
125          * 
126          * @param deviceId
127          *            device id to get device
128          */
129         public Device queryDevice(String deviceId) {
130             Device device = null;
131             synchronized (mMapDevice) {
132                 device = mMapDevice.get(deviceId);
133             }
134             return device;
135         }
136     }
137
138     CoapDevicePool mDevicePool = new CoapDevicePool();
139
140     /**
141      *
142      * This class provides a set of APIs to manage life cycle of coap message.
143      *
144      */
145     @Sharable
146     class CoapLifecycleHandler extends ChannelDuplexHandler {
147         @Override
148         public void channelRead(ChannelHandlerContext ctx, Object msg) {
149
150             if (msg instanceof CoapRequest) {
151                 try {
152                     CoapDevice coapDevice = (CoapDevice) ctx.channel()
153                             .attr(keyDevice).get();
154
155                     if (coapDevice.isExpiredTime()) {
156                         throw new UnAuthorizedException("token is expired");
157                     }
158
159                     CoapRequest coapRequest = (CoapRequest) msg;
160                     IRequestChannel targetChannel = null;
161                     String urlPath = coapRequest.getUriPath();
162
163                     if (urlPath == null) {
164                         throw new InternalServerErrorException(
165                                 "request uriPath is null");
166                     }
167
168                     if (urlPath.contains(Constants.ROUTE_FULL_URI)) {
169
170                         int RouteResourcePathSize = Constants.ROUTE_FULL_URI
171                                 .split("/").length;
172                         List<String> uriPath = coapRequest.getUriPathSegments();
173                         if (uriPath != null && !uriPath.isEmpty()) {
174                             CoapDevice targetDevice = (CoapDevice) mDevicePool
175                                     .queryDevice(uriPath
176                                             .get(RouteResourcePathSize - 1));
177                             targetChannel = targetDevice.getRequestChannel();
178                         }
179
180                         switch (coapRequest.getObserve()) {
181                             case SUBSCRIBE:
182                                 coapDevice.addObserveRequest(
183                                         Bytes.bytesToLong(
184                                                 coapRequest.getToken()),
185                                         coapRequest);
186                                 coapDevice.addObserveChannel(targetChannel);
187                                 break;
188                             case UNSUBSCRIBE:
189                                 coapDevice.removeObserveChannel(targetChannel);
190                                 coapDevice.removeObserveRequest(Bytes
191                                         .bytesToLong(coapRequest.getToken()));
192                                 break;
193                             default:
194                                 break;
195                         }
196                     }
197                 } catch (Throwable t) {
198                     Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
199                     ResponseStatus responseStatus = t instanceof ServerException
200                             ? ((ServerException) t).getErrorResponse()
201                             : ResponseStatus.INTERNAL_SERVER_ERROR;
202                     ctx.writeAndFlush(MessageBuilder
203                             .createResponse((CoapRequest) msg, responseStatus));
204                     ctx.close();
205                 }
206             }
207             ctx.fireChannelRead(msg);
208         }
209
210         @Override
211         public void write(ChannelHandlerContext ctx, Object msg,
212                 ChannelPromise promise) throws Exception {
213
214             boolean bCloseConnection = false;
215
216             if (msg instanceof CoapResponse) {
217                 // This is CoapResponse
218                 // Once the response is valid, add this to deviceList
219                 CoapResponse response = (CoapResponse) msg;
220
221                 String urlPath = response.getUriPath();
222
223                 if (urlPath == null) {
224                     throw new InternalServerErrorException(
225                             "request uriPath is null");
226                 }
227
228                 switch (urlPath) {
229                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
230                         if (response.getStatus() != ResponseStatus.CHANGED) {
231                             bCloseConnection = true;
232                         }
233                         break;
234                     case OICConstants.ACCOUNT_FULL_URI:
235                         if (response.getStatus() == ResponseStatus.DELETED) {
236                             bCloseConnection = true;
237                         }
238                         break;
239                 }
240             }
241
242             ctx.writeAndFlush(msg);
243
244             if (bCloseConnection == true) {
245                 ctx.close();
246             }
247         }
248
249         @Override
250         public void channelActive(ChannelHandlerContext ctx) {
251             Device device = ctx.channel().attr(keyDevice).get();
252             // Authenticated device connected
253
254             sendDevicePresence(device.getDeviceId(), "on");
255             mDevicePool.addDevice(device);
256
257             device.onConnected();
258         }
259
260         @Override
261         public void channelInactive(ChannelHandlerContext ctx)
262                 throws ClientException {
263             Device device = ctx.channel().attr(keyDevice).get();
264
265             // Some cases, this event occurs after new device connected using
266             // same di.
267             // So compare actual value, and remove if same.
268             if (device != null) {
269                 sendDevicePresence(device.getDeviceId(), "off");
270
271                 device.onDisconnected();
272
273                 mDevicePool.removeDevice(device);
274                 ctx.channel().attr(keyDevice).remove();
275
276             }
277         }
278
279         /**
280          * API for sending state to resource directory
281          * 
282          * @param deviceId
283          *            device id to be sent to resource directory
284          * @param state
285          *            device state to be sent to resource directory
286          */
287         public void sendDevicePresence(String deviceId, String state) {
288
289             Cbor<HashMap<String, Object>> cbor = new Cbor<>();
290             HashMap<String, Object> payload = new HashMap<String, Object>();
291             payload.put(Constants.REQ_DEVICE_ID, deviceId);
292             payload.put(Constants.PRESENCE_STATE, state);
293             StringBuffer uriPath = new StringBuffer();
294             uriPath.append("/" + Constants.PREFIX_OIC);
295             uriPath.append("/" + Constants.DEVICE_PRESENCE_URI);
296             ConnectorPool.getConnection("rd")
297                     .sendRequest(MessageBuilder.createRequest(
298                             RequestMethod.POST, uriPath.toString(), null,
299                             ContentFormat.APPLICATION_CBOR,
300                             cbor.encodingPayloadToCbor(payload)), null);
301         }
302     }
303
304     CoapLifecycleHandler mLifeCycleHandler = new CoapLifecycleHandler();
305
306     @Sharable
307     class CoapAuthHandler extends ChannelDuplexHandler {
308
309         @Override
310         public void channelActive(ChannelHandlerContext ctx) {
311             // Actual channel active should decided after authentication.
312         }
313
314         @Override
315         public void write(ChannelHandlerContext ctx, Object msg,
316                 ChannelPromise promise) {
317             try {
318
319                 if (!(msg instanceof CoapResponse)) {
320                     // throw new BadRequestException(
321                     // "this msg type is not CoapResponse");
322
323                     // TODO check websocket handshake response
324                     ctx.writeAndFlush(msg);
325                     return;
326                 }
327                 // This is CoapResponse
328                 // Once the response is valid, add this to deviceList
329
330                 CoapResponse response = (CoapResponse) msg;
331
332                 String urlPath = response.getUriPath();
333
334                 if (urlPath == null) {
335                     throw new InternalServerErrorException(
336                             "request uriPath is null");
337                 }
338
339                 switch (urlPath) {
340                     /*
341                      * case OICConstants.ACCOUNT_FULL_URI:
342                      * ctx.writeAndFlush(msg); ctx.close(); return;
343                      */
344
345                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
346                         HashMap<String, Object> payloadData = mCbor
347                                 .parsePayloadFromCbor(response.getPayload(),
348                                         HashMap.class);
349
350                         if (response.getStatus() != ResponseStatus.CHANGED) {
351                             throw new UnAuthorizedException();
352                         }
353
354                         if (payloadData == null) {
355                             throw new BadRequestException("payload is empty");
356                         }
357                         int remainTime = (int) payloadData
358                                 .get(Constants.EXPIRES_IN);
359
360                         Device device = ctx.channel().attr(keyDevice).get();
361                         ((CoapDevice) device).setExpiredPolicy(remainTime);
362
363                         // Remove current auth handler and replace to
364                         // LifeCycleHandle
365                         ctx.channel().pipeline().replace(this,
366                                 "LifeCycleHandler", mLifeCycleHandler);
367
368                         // Raise event that we have Authenticated device
369                         ctx.fireChannelActive();
370
371                         break;
372                 }
373
374                 ctx.writeAndFlush(msg);
375
376             } catch (Throwable t) {
377                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
378                 ctx.writeAndFlush(msg);
379                 ctx.close();
380             }
381         }
382
383         @Override
384         public void channelRead(ChannelHandlerContext ctx, Object msg) {
385             try {
386                 if (!(msg instanceof CoapRequest)) {
387                     throw new BadRequestException(
388                             "this msg type is not CoapRequest");
389                 }
390
391                 // And check first response is VALID then add or cut
392                 CoapRequest request = (CoapRequest) msg;
393
394                 String urlPath = request.getUriPath();
395
396                 if (urlPath == null) {
397                     throw new InternalServerErrorException(
398                             "request uriPath is null");
399                 }
400
401                 switch (urlPath) {
402                     // Check whether request is about account
403                     case OICConstants.ACCOUNT_FULL_URI:
404                     case OICConstants.ACCOUNT_TOKENREFRESH_FULL_URI:
405
406                         if (ctx.channel().attr(keyDevice).get() == null) {
407                             // Create device first and pass to upperlayer
408                             Device device = new CoapDevice(ctx);
409                             ctx.channel().attr(keyDevice).set(device);
410                         }
411
412                         break;
413
414                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
415
416                         HashMap<String, Object> authPayload = mCbor
417                                 .parsePayloadFromCbor(request.getPayload(),
418                                         HashMap.class);
419
420                         Device device = ctx.channel().attr(keyDevice).get();
421
422                         if (device == null) {
423                             device = new CoapDevice(ctx);
424                             ctx.channel().attr(keyDevice).set(device);
425                         }
426
427                         if (authPayload == null) {
428                             throw new BadRequestException("payload is empty");
429                         }
430
431                         ((CoapDevice) device).updateDevice(
432                                 (String) authPayload.get(Constants.DEVICE_ID),
433                                 (String) authPayload.get(Constants.USER_ID),
434                                 (String) authPayload
435                                         .get(Constants.ACCESS_TOKEN));
436
437                         break;
438
439                     case OICConstants.KEEP_ALIVE_FULL_URI:
440                         // TODO: Pass ping request to upper layer
441                         break;
442
443                     default:
444                         throw new UnAuthorizedException(
445                                 "authentication required first");
446                 }
447
448                 ctx.fireChannelRead(msg);
449
450             } catch (Throwable t) {
451                 ResponseStatus responseStatus = t instanceof ServerException
452                         ? ((ServerException) t).getErrorResponse()
453                         : ResponseStatus.UNAUTHORIZED;
454                 ctx.writeAndFlush(MessageBuilder
455                         .createResponse((CoapRequest) msg, responseStatus));
456                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
457             }
458         }
459     }
460
461     @Sharable
462     static class HttpAuthHandler extends ChannelDuplexHandler {
463         @Override
464         public void channelActive(ChannelHandlerContext ctx) throws Exception {
465             // After current channel authenticated, raise to upper layer
466         }
467     }
468
469     @Sharable
470     class CoapSignalingHandler extends ChannelInboundHandlerAdapter {
471
472         @Override
473         public void channelInactive(ChannelHandlerContext ctx)
474                 throws Exception {
475             // delete csm information from the map
476             mCsmMap.remove(ctx);
477             ctx.fireChannelInactive();
478         }
479
480         @Override
481         public void channelRead(ChannelHandlerContext ctx, Object msg) {
482             try {
483                 if (msg instanceof CoapSignaling) {
484                     if (mCsmMap.get(ctx) == null) {
485                         // In the server, the CSM message is sent to the device
486                         // once
487                         CoapSignaling inicialCsm = (CoapSignaling) MessageBuilder
488                                 .createSignaling(SignalingMethod.CSM);
489                         inicialCsm.setCsmMaxMessageSize(4294967295L);
490                         ctx.writeAndFlush(inicialCsm);
491                     }
492                     CoapSignaling signaling = (CoapSignaling) msg;
493                     switch (signaling.getSignalingMethod()) {
494                         case CSM:
495                             // get existing CSM from the map
496                             CoapSignaling existingCsm = mCsmMap.get(ctx);
497                             if (existingCsm == null) {
498                                 existingCsm = signaling;
499                             } else {
500                                 // replace and cumulate CSM options
501                                 existingCsm.setCsmBlockWiseTransfer(
502                                         signaling.getCsmBlockWiseTransfer());
503                                 existingCsm.setCsmMaxMessageSize(
504                                         signaling.getCsmMaxMessageSize());
505                                 existingCsm.setCsmServerName(
506                                         signaling.getCsmServerName());
507                             }
508                             mCsmMap.put(ctx, existingCsm);
509                             break;
510                         case PING:
511                             // TODO process PING signaling option
512                             break;
513                         case PONG:
514                             // TODO process PONG signaling option
515                             break;
516                         case RELEASE:
517                         case ABORT:
518                             mCsmMap.remove(ctx);
519                             ctx.close();
520                             break;
521                         default:
522                             throw new BadOptionException(
523                                     "unsupported CoAP Signaling option");
524                     }
525
526                     ctx.fireChannelRead(msg);
527                 } else {
528                     ctx.fireChannelRead(msg);
529                     // TODO annotated codes must be removed to follow
530                     // the CSM specification of draft-ietf-core-coap-tcp-tls-05
531
532                     // if (mCsmMap.get(ctx) != null) {
533                     // ctx.fireChannelRead(msg);
534                     // } else {
535                     // // send ABORT signaling and close the connection
536                     // ctx.writeAndFlush(MessageBuilder.createSignaling(
537                     // SignalingMethod.ABORT,
538                     // new String(
539                     // "Capability and Settings message (CSM) is not received
540                     // yet")
541                     // .getBytes()));
542                     // ctx.close();
543                     // }
544                 }
545             } catch (Throwable t) {
546                 ResponseStatus responseStatus = t instanceof ServerException
547                         ? ((ServerException) t).getErrorResponse()
548                         : ResponseStatus.BAD_OPTION;
549                 if (msg instanceof CoapRequest) {
550                     ctx.writeAndFlush(MessageBuilder
551                             .createResponse((CoapRequest) msg, responseStatus));
552                 } else if (msg instanceof CoapSignaling) {
553                     ctx.writeAndFlush(MessageBuilder.createSignalingResponse(
554                             (CoapSignaling) msg, responseStatus));
555                 }
556                 Log.error("[{}] channel error", ctx.channel().id().asLongText().substring(26), t);
557             }
558         }
559
560     }
561
562     @Override
563     public void addServer(Server server) {
564         if (server instanceof CoapServer) {
565             server.addHandler(new CoapSignalingHandler());
566             server.addHandler(new CoapAuthHandler());
567         }
568
569         if (server instanceof WebSocketServer) {
570             server.addHandler(new CoapAuthHandler());
571         }
572
573         if (server instanceof HttpServer) {
574             server.addHandler(new HttpAuthHandler());
575         }
576
577         super.addServer(server);
578     }
579
580     public CoapDevicePool getDevicePool() {
581         return mDevicePool;
582     }
583 }