Fix concurrent problem during observation
[iotivity.git] / cloud / stack / src / main / java / org / iotivity / cloud / base / device / CoapDevice.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.base.device;
23
24 import java.util.ArrayList;
25 import java.util.Date;
26 import java.util.Iterator;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29
30 import org.iotivity.cloud.base.connector.CoapClient;
31 import org.iotivity.cloud.base.connector.ConnectorPool;
32 import org.iotivity.cloud.base.exception.ClientException;
33 import org.iotivity.cloud.base.protocols.IRequest;
34 import org.iotivity.cloud.base.protocols.IResponse;
35 import org.iotivity.cloud.base.protocols.MessageBuilder;
36 import org.iotivity.cloud.base.protocols.coap.CoapRequest;
37 import org.iotivity.cloud.base.protocols.coap.CoapResponse;
38 import org.iotivity.cloud.base.protocols.enums.Observe;
39 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
40 import org.iotivity.cloud.util.Bytes;
41 import org.iotivity.cloud.util.Log;
42
43 import io.netty.channel.ChannelHandlerContext;
44
45 public class CoapDevice extends Device {
46     private CoapClient                 mCoapClient         = null;
47     private String                     mUserId             = null;
48     private String                     mDeviceId           = null;
49     private String                     mAccessToken        = null;
50     private Date                       mIssuedTime         = null;
51     private int                        mExpiredPolicy      = 0;
52     private ArrayList<IRequestChannel> mObserveChannelList = new ArrayList<>();
53     private ConcurrentMap<Long, IRequest>    mObserveRequestList = new ConcurrentHashMap<>();
54
55     private static final int           INFINITE_TIME       = -1;
56
57     public CoapDevice(ChannelHandlerContext ctx) {
58         super(ctx);
59     }
60
61     public void updateDevice(String did, String uid, String accesstoken) {
62         mDeviceId = did;
63         mUserId = uid;
64         mAccessToken = accesstoken;
65     }
66
67     @Override
68     public String getDeviceId() {
69         return mDeviceId;
70     }
71
72     @Override
73     public String getUserId() {
74         return mUserId;
75     }
76
77     @Override
78     public String getAccessToken() {
79         return mAccessToken;
80     }
81
82     public Date getIssuedTime() {
83         return mIssuedTime;
84     }
85
86     public int getExpiredPolicy() {
87         return mExpiredPolicy;
88     }
89
90     public void setExpiredPolicy(int expiredPolicy) {
91         mIssuedTime = new Date();
92         this.mExpiredPolicy = expiredPolicy;
93     }
94
95     public void setUerId(String userId) {
96         this.mUserId = userId;
97     }
98
99     public void setAccessToken(String accessToken) {
100         this.mAccessToken = accessToken;
101     }
102
103     public void addObserveChannel(IRequestChannel channel) {
104         mObserveChannelList.add(channel);
105     }
106
107     public void removeObserveChannel(IRequestChannel channel)
108             throws ClientException {
109         if (mObserveChannelList.contains(channel)) {
110
111             Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
112             while (iterator.hasNext()) {
113                 Long token = iterator.next();
114                 CoapClient coapClient = (CoapClient) channel;
115                 if (coapClient.isObserveRequest(token) != null) {
116                     coapClient.removeObserve(token);
117                     CoapRequest coapRequest = (CoapRequest) mObserveRequestList
118                             .get(token);
119                     coapRequest.setObserve(Observe.UNSUBSCRIBE);
120                     IResponse response = MessageBuilder.createResponse(
121                             coapRequest, ResponseStatus.CONTENT, null, null);
122                     sendResponse(response);
123                 }
124             }
125             mObserveChannelList.remove(channel);
126         }
127     }
128
129     public void addObserveRequest(Long token, IRequest request) {
130
131         mObserveRequestList.put(token, request);
132     }
133
134     public void removeObserveRequest(Long token) {
135
136         mObserveRequestList.remove(token);
137     }
138
139     // This is called by cloud resource model
140     @Override
141     public void sendResponse(IResponse response) {
142         // This message must converted to CoapResponse
143         CoapResponse coapResponse = (CoapResponse) response;
144
145         Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
146         while (iterator.hasNext()) {
147             Long token = iterator.next();
148             Long respToken = Bytes.bytesToLong(coapResponse.getToken());
149             if (respToken.equals(token)
150                     && coapResponse.getObserve() == Observe.NOTHING) {
151                 iterator.remove();
152             }
153         }
154         ctx.channel().writeAndFlush(response);
155     }
156
157     public IRequestChannel getRequestChannel() {
158         if (mCoapClient == null) {
159             mCoapClient = new CoapClient(ctx.channel());
160         }
161
162         return mCoapClient;
163     }
164
165     public boolean isExpiredTime() {
166
167         if (mExpiredPolicy == INFINITE_TIME) {
168             return false;
169         }
170
171         Date currentTime = new Date();
172         long difference = currentTime.getTime() - mIssuedTime.getTime();
173         long remainTime = mExpiredPolicy - difference / 1000;
174
175         if (remainTime < 0) {
176
177             Log.w("accessToken is expired..");
178             return true;
179         }
180
181         return false;
182     }
183
184     @Override
185     public void onConnected() {
186         mObserveChannelList.addAll(ConnectorPool.getConnectionList());
187     }
188
189     @Override
190     public void onDisconnected() {
191         Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
192         while (iterator.hasNext()) {
193             Long token = iterator.next();
194             for (IRequestChannel serverChannel : mObserveChannelList) {
195                 CoapClient coapClient = (CoapClient) serverChannel;
196                 if (coapClient != null
197                         && coapClient.isObserveRequest(token) != null) {
198                     CoapRequest coapRequest = (CoapRequest) mObserveRequestList
199                             .get(token);
200                     coapRequest.setObserve(Observe.UNSUBSCRIBE);
201                     coapRequest.setToken(Bytes.longTo8Bytes(token));
202                     serverChannel.sendRequest(MessageBuilder.modifyRequest(
203                             coapRequest, null, null, null, null), this);
204                 }
205             }
206         }
207     }
208 }