modified observe exception, when channel is disconnected
[iotivity.git] / cloud / interface / src / main / java / org / iotivity / cloud / ciserver / DeviceServerSystem.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.ciserver;
23
24 import java.util.HashMap;
25 import java.util.Iterator;
26
27 import org.iotivity.cloud.base.OICConstants;
28 import org.iotivity.cloud.base.ServerSystem;
29 import org.iotivity.cloud.base.connector.ConnectorPool;
30 import org.iotivity.cloud.base.device.CoapDevice;
31 import org.iotivity.cloud.base.device.Device;
32 import org.iotivity.cloud.base.device.IRequestChannel;
33 import org.iotivity.cloud.base.exception.ClientException;
34 import org.iotivity.cloud.base.exception.ServerException;
35 import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
36 import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException;
37 import org.iotivity.cloud.base.protocols.MessageBuilder;
38 import org.iotivity.cloud.base.protocols.coap.CoapRequest;
39 import org.iotivity.cloud.base.protocols.coap.CoapResponse;
40 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
41 import org.iotivity.cloud.base.protocols.enums.RequestMethod;
42 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
43 import org.iotivity.cloud.base.server.CoapServer;
44 import org.iotivity.cloud.base.server.HttpServer;
45 import org.iotivity.cloud.base.server.Server;
46 import org.iotivity.cloud.util.Bytes;
47 import org.iotivity.cloud.util.Cbor;
48 import org.iotivity.cloud.util.Log;
49
50 import io.netty.channel.ChannelDuplexHandler;
51 import io.netty.channel.ChannelHandler.Sharable;
52 import io.netty.channel.ChannelHandlerContext;
53 import io.netty.channel.ChannelPromise;
54
55 public class DeviceServerSystem extends ServerSystem {
56
57     IRequestChannel mRDServer = null;
58
59     public DeviceServerSystem() {
60         mRDServer = ConnectorPool.getConnection("rd");
61     }
62
63     public class CoapDevicePool {
64         HashMap<String, Device> mMapDevice = new HashMap<>();
65
66         public void addDevice(Device device) {
67             String deviceId = ((CoapDevice) device).getDeviceId();
68             synchronized (mMapDevice) {
69                 mMapDevice.put(deviceId, device);
70             }
71         }
72
73         public void removeDevice(Device device) throws ClientException {
74             String deviceId = ((CoapDevice) device).getDeviceId();
75             synchronized (mMapDevice) {
76                 if (mMapDevice.get(deviceId) == device) {
77                     mMapDevice.remove(deviceId);
78                 }
79             }
80             removeObserveDevice(device);
81         }
82
83         private void removeObserveDevice(Device device) throws ClientException {
84             Iterator<String> iterator = mMapDevice.keySet().iterator();
85             while (iterator.hasNext()) {
86                 String deviceId = iterator.next();
87                 CoapDevice getDevice = (CoapDevice) mDevicePool
88                         .queryDevice(deviceId);
89                 getDevice.removeObserveChannel(
90                         ((CoapDevice) device).getRequestChannel());
91             }
92         }
93
94         public Device queryDevice(String deviceId) {
95             Device device = null;
96             synchronized (mMapDevice) {
97                 device = mMapDevice.get(deviceId);
98             }
99             return device;
100         }
101     }
102
103     CoapDevicePool mDevicePool = new CoapDevicePool();
104
105     @Sharable
106     class CoapLifecycleHandler extends ChannelDuplexHandler {
107         @Override
108         public void channelRead(ChannelHandlerContext ctx, Object msg) {
109
110             if (msg instanceof CoapRequest) {
111                 try {
112                     CoapDevice coapDevice = (CoapDevice) ctx.channel()
113                             .attr(keyDevice).get();
114
115                     if (coapDevice.isExpiredTime()) {
116                         throw new UnAuthorizedException("token is expired");
117                     }
118
119                     CoapRequest coapRequest = (CoapRequest) msg;
120                     IRequestChannel targetChannel = null;
121                     if (coapRequest.getUriPathSegments()
122                             .contains(Constants.REQ_DEVICE_ID)) {
123                         CoapDevice targetDevice = (CoapDevice) mDevicePool
124                                 .queryDevice(coapRequest.getUriPathSegments()
125                                         .get(1));
126                         targetChannel = targetDevice.getRequestChannel();
127                     }
128                     switch (coapRequest.getObserve()) {
129                         case SUBSCRIBE:
130                             coapDevice.addObserveRequest(
131                                     Bytes.bytesToLong(coapRequest.getToken()),
132                                     coapRequest);
133                             coapDevice.addObserveChannel(targetChannel);
134                             break;
135                         case UNSUBSCRIBE:
136                             coapDevice.removeObserveChannel(targetChannel);
137                             coapDevice.removeObserveRequest(
138                                     Bytes.bytesToLong(coapRequest.getToken()));
139                             break;
140                         default:
141                             break;
142                     }
143
144                 } catch (Throwable t) {
145                     Log.f(ctx.channel(), t);
146                     ResponseStatus responseStatus = t instanceof ServerException
147                             ? ((ServerException) t).getErrorResponse()
148                             : ResponseStatus.INTERNAL_SERVER_ERROR;
149                     ctx.writeAndFlush(MessageBuilder
150                             .createResponse((CoapRequest) msg, responseStatus));
151                     ctx.close();
152                 }
153             }
154
155             ctx.fireChannelRead(msg);
156         }
157
158         @Override
159         public void channelActive(ChannelHandlerContext ctx) {
160             Device device = ctx.channel().attr(keyDevice).get();
161             // Authenticated device connected
162
163             sendDevicePresence(device.getDeviceId(), "on");
164             mDevicePool.addDevice(device);
165
166             device.onConnected();
167         }
168
169         @Override
170         public void channelInactive(ChannelHandlerContext ctx)
171                 throws ClientException {
172             Device device = ctx.channel().attr(keyDevice).get();
173             // Some cases, this event occurs after new device connected using
174             // same di.
175             // So compare actual value, and remove if same.
176             if (device != null) {
177                 sendDevicePresence(device.getDeviceId(), "off");
178
179                 device.onDisconnected();
180
181                 mDevicePool.removeDevice(device);
182                 ctx.channel().attr(keyDevice).remove();
183
184             }
185         }
186
187         public void sendDevicePresence(String deviceId, String state) {
188
189             Cbor<HashMap<String, Object>> cbor = new Cbor<>();
190             HashMap<String, Object> payload = new HashMap<String, Object>();
191             payload.put(Constants.REQ_DEVICE_ID, deviceId);
192             payload.put(Constants.PRESENCE_STATE, state);
193             StringBuffer uriPath = new StringBuffer();
194             uriPath.append("/" + Constants.PREFIX_OIC);
195             uriPath.append("/" + Constants.DEVICE_PRESENCE_URI);
196             mRDServer.sendRequest(MessageBuilder.createRequest(
197                     RequestMethod.POST, uriPath.toString(), null,
198                     ContentFormat.APPLICATION_CBOR,
199                     cbor.encodingPayloadToCbor(payload)), null);
200         }
201     }
202
203     CoapLifecycleHandler mLifeCycleHandler = new CoapLifecycleHandler();
204
205     @Sharable
206     class CoapAuthHandler extends ChannelDuplexHandler {
207         private Cbor<HashMap<String, Object>> mCbor = new Cbor<HashMap<String, Object>>();
208
209         @Override
210         public void channelActive(ChannelHandlerContext ctx) {
211             // Actual channel active should decided after authentication.
212         }
213
214         @Override
215         public void write(ChannelHandlerContext ctx, Object msg,
216                 ChannelPromise promise) {
217
218             try {
219
220                 if (!(msg instanceof CoapResponse)) {
221                     throw new BadRequestException(
222                             "this msg type is not CoapResponse");
223                 }
224                 // This is CoapResponse
225                 // Once the response is valid, add this to deviceList
226                 CoapResponse response = (CoapResponse) msg;
227
228                 switch (response.getUriPath()) {
229
230                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
231                         HashMap<String, Object> payloadData = mCbor
232                                 .parsePayloadFromCbor(response.getPayload(),
233                                         HashMap.class);
234
235                         if (response.getStatus() != ResponseStatus.CHANGED) {
236                             throw new UnAuthorizedException();
237                         }
238
239                         if (payloadData == null) {
240                             throw new BadRequestException("payload is empty");
241                         }
242                         int remainTime = (int) payloadData
243                                 .get(Constants.EXPIRES_IN);
244
245                         Device device = ctx.channel().attr(keyDevice).get();
246                         ((CoapDevice) device).setExpiredPolicy(remainTime);
247
248                         // Remove current auth handler and replace to
249                         // LifeCycleHandler
250                         ctx.channel().pipeline().replace(this,
251                                 "LifeCycleHandler", mLifeCycleHandler);
252
253                         // Raise event that we have Authenticated device
254                         ctx.fireChannelActive();
255
256                         break;
257                 }
258
259                 ctx.writeAndFlush(msg);
260
261             } catch (Throwable t) {
262                 Log.f(ctx.channel(), t);
263                 ctx.writeAndFlush(msg);
264                 ctx.close();
265             }
266         }
267
268         @Override
269         public void channelRead(ChannelHandlerContext ctx, Object msg) {
270
271             try {
272                 if (!(msg instanceof CoapRequest)) {
273                     throw new BadRequestException(
274                             "this msg type is not CoapRequest");
275                 }
276
277                 // And check first response is VALID then add or cut
278                 CoapRequest request = (CoapRequest) msg;
279
280                 switch (request.getUriPath()) {
281                     // Check whether request is about account
282                     case OICConstants.ACCOUNT_FULL_URI:
283                     case OICConstants.ACCOUNT_TOKENREFRESH_FULL_URI:
284
285                         if (ctx.channel().attr(keyDevice).get() == null) {
286                             // Create device first and pass to upperlayer
287                             Device device = new CoapDevice(ctx);
288                             ctx.channel().attr(keyDevice).set(device);
289                         }
290
291                         break;
292
293                     case OICConstants.ACCOUNT_SESSION_FULL_URI:
294
295                         HashMap<String, Object> authPayload = mCbor
296                                 .parsePayloadFromCbor(request.getPayload(),
297                                         HashMap.class);
298
299                         Device device = ctx.channel().attr(keyDevice).get();
300
301                         if (device == null) {
302                             device = new CoapDevice(ctx);
303                             ctx.channel().attr(keyDevice).set(device);
304                         }
305
306                         if (authPayload == null) {
307                             throw new BadRequestException("payload is empty");
308                         }
309
310                         ((CoapDevice) device).updateDevice(
311                                 (String) authPayload.get(Constants.DEVICE_ID),
312                                 (String) authPayload.get(Constants.USER_ID),
313                                 (String) authPayload
314                                         .get(Constants.ACCESS_TOKEN));
315
316                         break;
317
318                     case OICConstants.KEEP_ALIVE_FULL_URI:
319                         // TODO: Pass ping request to upper layer
320                         break;
321
322                     default:
323                         throw new UnAuthorizedException(
324                                 "authentication required first");
325                 }
326
327                 ctx.fireChannelRead(msg);
328
329             } catch (Throwable t) {
330                 ResponseStatus responseStatus = t instanceof ServerException
331                         ? ((ServerException) t).getErrorResponse()
332                         : ResponseStatus.UNAUTHORIZED;
333                 ctx.writeAndFlush(MessageBuilder
334                         .createResponse((CoapRequest) msg, responseStatus));
335                 Log.f(ctx.channel(), t);
336             }
337         }
338     }
339
340     @Sharable
341     class HttpAuthHandler extends ChannelDuplexHandler {
342         @Override
343         public void channelActive(ChannelHandlerContext ctx) throws Exception {
344             // After current channel authenticated, raise to upper layer
345         }
346     }
347
348     @Override
349     public void addServer(Server server) {
350         if (server instanceof CoapServer) {
351             server.addHandler(new CoapAuthHandler());
352         }
353
354         if (server instanceof HttpServer) {
355             server.addHandler(new HttpAuthHandler());
356         }
357
358         super.addServer(server);
359     }
360
361     public CoapDevicePool getDevicePool() {
362         return mDevicePool;
363     }
364 }