IOT-3215 Update RD PRESENCE_TABLE 43/27243/1
authorPeter Rafaj <peter.rafaj@kistler.com>
Tue, 18 Sep 2018 13:13:24 +0000 (15:13 +0200)
committerPeter Rafaj <peter.rafaj@kistler.com>
Tue, 16 Oct 2018 07:03:34 +0000 (09:03 +0200)
Change-Id: I30577905978c91cbb2e2f223f0ea1cbb1ce12a0e
Signed-off-by: Peter Rafaj <peter.rafaj@kistler.com>
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/CloudInterfaceServer.java
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/Constants.java
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/DevicePresenter.java [new file with mode: 0644]
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/KeepAliveResource.java
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/UpdateDeviceStateListener.java [new file with mode: 0644]
cloud/resourcedirectory/src/main/java/org/iotivity/cloud/rdserver/db/DBManager.java
cloud/resourcedirectory/src/main/java/org/iotivity/cloud/rdserver/db/MongoDB.java
cloud/resourcedirectory/src/main/java/org/iotivity/cloud/rdserver/resources/presence/device/DevicePresenceResource.java
cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapConnector.java
cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectionEstablishedListener.java [new file with mode: 0644]
cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java

index a31d4b2..c13b37c 100644 (file)
@@ -24,6 +24,7 @@ package org.iotivity.cloud.ciserver;
 import java.net.InetSocketAddress;
 import java.util.Scanner;
 
+import org.iotivity.cloud.ciserver.resources.UpdateDeviceStateListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.iotivity.cloud.base.connector.ConnectorPool;
@@ -79,9 +80,11 @@ public class CloudInterfaceServer {
             return;
         }
 
+        final KeepAliveResource resKeepAlive = new KeepAliveResource(deviceKeepAliveMinutes);
+        final UpdateDeviceStateListener updatePresenceState = new UpdateDeviceStateListener(resKeepAlive);
         ConnectorPool.requestConnection("rd",
                 new InetSocketAddress(resourceDirectoryAddress, resourceDirectoryPort),
-                tlsMode, keepAlive);
+                tlsMode, keepAlive, updatePresenceState);
         ConnectorPool.requestConnection("account",
                 new InetSocketAddress(accountServerAddress, accountServerPort),
                 tlsMode, keepAlive);
@@ -132,8 +135,6 @@ public class CloudInterfaceServer {
 
         deviceServer.addResource(crlHandler);
 
-        KeepAliveResource resKeepAlive = new KeepAliveResource(deviceKeepAliveMinutes);
-
         deviceServer.addResource(resKeepAlive);
 
         deviceServer.addResource(new RouteResource(devicePool));
index 59164a4..97a8092 100644 (file)
@@ -35,7 +35,6 @@ public class Constants extends OICConstants {
 
     public static final String USER_ID                         = "uid";
     public static final String DEVICE_ID                       = "di";
-    public static final String PRESENCE_STATE                  = "state";
 
     public static final String REQ_LOGIN                       = "login";
 
@@ -67,4 +66,11 @@ public class Constants extends OICConstants {
     public static final String REQ_LINKS                       = "links";
     public static final String REQ_HREF                        = "href";
     public static final String REQ_CRL                         = "crl";
+
+
+    /** '/oic/prs' resource property */
+    public static final String PRESENCE_STATE          = "state";
+    public static final String PRESENCE_ON             = "on";
+    public static final String PRESENCE_LIST           = "prslist";
+
 }
diff --git a/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/DevicePresenter.java b/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/DevicePresenter.java
new file mode 100644 (file)
index 0000000..2de38af
--- /dev/null
@@ -0,0 +1,12 @@
+package org.iotivity.cloud.ciserver.resources;
+
+import java.util.Set;
+
+/**
+ *  Component for selecting actively connected devices to cloud
+ */
+public interface DevicePresenter {
+
+    Set<String> getDeviceIds();
+
+}
index 0f16594..087a301 100644 (file)
  */
 package org.iotivity.cloud.ciserver.resources;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import org.iotivity.cloud.base.device.Device;
@@ -53,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * connection.
  *
  */
-public class KeepAliveResource extends Resource {
+public class KeepAliveResource extends Resource  implements DevicePresenter {
     private final static Logger Log = LoggerFactory.getLogger(KeepAliveResource.class);
     private int[]                         mIntervals      = null;
     private Timer                         mTimer          = new Timer();
@@ -129,6 +122,16 @@ public class KeepAliveResource extends Resource {
         return MessageBuilder.createResponse(request, ResponseStatus.VALID);
     }
 
+
+    @Override
+    public Set<String> getDeviceIds() {
+        Map<Device, Long> map = Collections
+                .synchronizedMap(mConnectionPool);
+        synchronized (map){
+            return new HashSet<>(map.keySet().stream().map(device -> device.getDeviceId()).collect(Collectors.toSet()));
+        }
+    }
+
     /**
      * API for managing session
      */
diff --git a/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/UpdateDeviceStateListener.java b/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/UpdateDeviceStateListener.java
new file mode 100644 (file)
index 0000000..ca78c4d
--- /dev/null
@@ -0,0 +1,57 @@
+package org.iotivity.cloud.ciserver.resources;
+
+import org.iotivity.cloud.base.connector.ConnectionEstablishedListener;
+import org.iotivity.cloud.base.connector.ConnectorPool;
+import org.iotivity.cloud.base.device.IRequestChannel;
+import org.iotivity.cloud.base.protocols.MessageBuilder;
+import org.iotivity.cloud.base.protocols.enums.ContentFormat;
+import org.iotivity.cloud.base.protocols.enums.RequestMethod;
+import org.iotivity.cloud.ciserver.Constants;
+import org.iotivity.cloud.util.Cbor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class UpdateDeviceStateListener implements ConnectionEstablishedListener {
+
+    private final static Logger Log = LoggerFactory.getLogger(UpdateDeviceStateListener.class);
+
+    private final DevicePresenter devicePresenter;
+
+    public UpdateDeviceStateListener(DevicePresenter devicePresenter) {
+        this.devicePresenter = devicePresenter;
+    }
+
+    @Override
+    public void onConnectionEstablished(IRequestChannel requestChannel) {
+
+        final Set<String> deviceIds = devicePresenter.getDeviceIds().stream()
+                .filter(Objects::nonNull).collect(Collectors.toSet());
+        if(deviceIds == null || deviceIds.size() == 0){
+            return;
+        }
+        Log.debug("RD is connected. Update presence table of actually connected device");
+        Cbor<HashMap<String, Object>> cbor = new Cbor<>();
+        HashMap<String, Object> payload = new HashMap<String, Object>();
+        List<Map<String,Object>> devicePresenceList = new LinkedList<>();
+        payload.put(Constants.PRESENCE_LIST, devicePresenceList);
+        deviceIds.stream().forEach(deviceId -> {
+            final HashMap<String, Object> deviceStatusRecord = new HashMap<>();
+            deviceStatusRecord.put(Constants.DEVICE_ID, deviceId);
+            deviceStatusRecord.put(Constants.PRESENCE_STATE, Constants.PRESENCE_ON);
+            devicePresenceList.add(deviceStatusRecord);
+        });
+
+        Log.debug("Payload for presence update: {} ", payload);
+        StringBuffer uriPath = new StringBuffer();
+        uriPath.append("/" + Constants.PREFIX_OIC);
+        uriPath.append("/" + Constants.DEVICE_PRESENCE_URI);
+        requestChannel.sendRequest(MessageBuilder.createRequest(
+                        RequestMethod.PUT, uriPath.toString(), null,
+                        ContentFormat.APPLICATION_CBOR,
+                        cbor.encodingPayloadToCbor(payload)), null);
+        Log.debug("Presence table of RD successfully updated");
+    }
+}
index 1180078..be3b567 100644 (file)
@@ -31,6 +31,8 @@ import java.util.Set;
 import org.bson.Document;
 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
 import org.iotivity.cloud.rdserver.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
@@ -39,6 +41,8 @@ import org.iotivity.cloud.rdserver.Constants;
  */
 public class DBManager {
 
+    private final static Logger Log             = LoggerFactory.getLogger(DBManager.class);
+
     private static DBManager                   mDBManager;
     private MongoDB                            mMongoDB   = null;
     private HashMap<String, ArrayList<String>> mKeyField  = new HashMap<>();
@@ -82,10 +86,8 @@ public class DBManager {
 
     private void createTables() {
         mMongoDB.createTable(Constants.RD_TABLE);
-
-        // TODO cannot be drop in case of component scaling
-        mMongoDB.dropTable(Constants.PRESENCE_TABLE);
         mMongoDB.createTable(Constants.PRESENCE_TABLE);
+        updatePresenceTable();
     }
 
     private void createIndexes() {
@@ -105,6 +107,30 @@ public class DBManager {
 
     }
 
+    private void updatePresenceTable(){
+        Log.debug("Start update presence table. Update all devices to offline state");
+        final HashMap<String,Object> emptyCondition = new HashMap<>();
+        final HashMap<String,Object> updateEntry = new HashMap<>();
+        final HashMap<String,Object> stateUpdate = new HashMap<>();
+        stateUpdate.put(Constants.PRESENCE_STATE, Constants.PRESENCE_OFF);
+        updateEntry.put("$set",stateUpdate);
+        selectAndUpdate(Constants.PRESENCE_TABLE, emptyCondition, updateEntry);
+        Log.debug("Presence table was successfully updated");
+    }
+
+    /**
+     * API for selecting specific record and update in one operation
+     *
+     * @param tableName - table name to be updated
+     * @param condition - condition to match record
+     * @param update - data to be updated
+     */
+    public void selectAndUpdate(final String tableName, final HashMap<String, Object> condition, final HashMap<String, Object> update){
+        if (!_selectAndUpdate(tableName, condition, update))
+            throw new InternalServerErrorException(
+                    "Database record insert failed");
+    }
+
     /**
      * API for inserting a record into DB table. the record will not be inserted
      * if duplicated one.
@@ -216,6 +242,13 @@ public class DBManager {
 
     }
 
+    private Boolean _selectAndUpdate(final String tableName, final HashMap<String, Object> condition, final HashMap<String, Object> update) {
+
+        final Document updateDoc = createDocument(update);
+        final Document filterDoc = createDocument(condition);
+        return mMongoDB.updateMany(tableName, filterDoc, updateDoc);
+    }
+
     private Boolean _insertRecord(String tableName,
             HashMap<String, Object> record) {
 
index 80d2869..50e566c 100644 (file)
@@ -88,6 +88,20 @@ public class MongoDB {
             db.getCollection(tableName).drop();
     }
 
+    public boolean updateMany(final String tableName, final Document filter, final Document update){
+
+        if (tableName == null || filter == null || update == null)
+            return false;
+        final MongoCollection<Document> collection = db.getCollection(tableName);
+        try {
+            collection.updateMany(filter,update);
+            return true;
+        } catch (Exception e) {
+            Log.error("Error update many record", e);
+            return false;
+        }
+    }
+
     /**
      * API for creating index
      *
index 301ec2c..56ce332 100644 (file)
@@ -33,8 +33,11 @@ import org.iotivity.cloud.base.protocols.enums.ContentFormat;
 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
 import org.iotivity.cloud.base.resource.Resource;
 import org.iotivity.cloud.rdserver.Constants;
+import org.iotivity.cloud.rdserver.resources.directory.rd.ResourceDirectoryResource;
 import org.iotivity.cloud.rdserver.resources.presence.PresenceManager;
 import org.iotivity.cloud.util.Cbor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
@@ -43,6 +46,7 @@ import org.iotivity.cloud.util.Cbor;
  */
 public class DevicePresenceResource extends Resource {
 
+    private final static Logger Log                  = LoggerFactory.getLogger(DevicePresenceResource.class);
     private Cbor<HashMap<String, Object>> mCbor = new Cbor<>();
 
     public DevicePresenceResource() {
@@ -66,6 +70,10 @@ public class DevicePresenceResource extends Resource {
                 response = handlePostRequest(request);
                 break;
 
+            case PUT:
+                response = handlePutRequest(request);
+                break;
+
             default:
                 throw new BadRequestException(
                         request.getMethod() + " request type is not supported");
@@ -125,4 +133,29 @@ public class DevicePresenceResource extends Resource {
         return MessageBuilder.createResponse(request, ResponseStatus.CHANGED);
 
     }
+
+    private IResponse handlePutRequest(IRequest request)
+            throws ServerException {
+        // check payload
+        byte[] payload = request.getPayload();
+        Log.debug("Update presence table");
+        HashMap<String, Object> parsedPayload = mCbor
+                .parsePayloadFromCbor(payload, HashMap.class);
+
+        checkPayloadException(
+                Arrays.asList(Constants.PRESENCE_LIST),
+                parsedPayload);
+        ArrayList<HashMap<String, String>> prsList = (ArrayList<HashMap<String, String>>) parsedPayload
+                .get(Constants.PRESENCE_LIST);
+
+        for(final Map<String,String> record: prsList) {
+            // store db
+            Log.debug("Update device presence state: {}", record);
+            PresenceManager.getInstance().updateDevicePresence(new HashMap<>(record));
+        }
+
+        return MessageBuilder.createResponse(request, ResponseStatus.CHANGED);
+
+    }
+
 }
index 64b76c5..a7c45a8 100644 (file)
@@ -160,7 +160,7 @@ public class CoapConnector {
     Timer                        mTimer          = new Timer();
 
     public void connect(final String connectionName, final InetSocketAddress inetSocketAddress,
-            boolean tlsMode, boolean keepAlive) {
+            boolean tlsMode, boolean keepAlive, final ConnectionEstablishedListener listener) {
 
         CoapConnectorInitializer initializer = new CoapConnectorInitializer();
 
@@ -173,18 +173,19 @@ public class CoapConnector {
         initializer.setKeepAlive(keepAlive);
         initializer.addHandler(new CoapPacketHandler());
         mBootstrap.handler(initializer);
-        doConnect(connectionName, inetSocketAddress, tlsMode);
+        doConnect(connectionName, inetSocketAddress, tlsMode, listener);
     }
 
-    private void doConnect(final String connectionName, final InetSocketAddress inetSocketAddress, final boolean tlsMode) {
+    private void doConnect(final String connectionName, final InetSocketAddress inetSocketAddress, final boolean tlsMode,
+                           final ConnectionEstablishedListener listener) {
         mBootstrap.connect(inetSocketAddress).addListener(new ChannelFutureListener() {
                 @Override public void operationComplete(ChannelFuture future) throws Exception {
                     if(!future.isSuccess()) {
                         Log.debug("Connection to " + inetSocketAddress.getHostString() + " was not successful. Retrying...");
                         future.channel().close();
-                        scheduleConnect(connectionName, inetSocketAddress, tlsMode, 5000);
+                        scheduleConnect(connectionName, inetSocketAddress, tlsMode, 5000, listener);
                     } else {
-                        connectionEstablished(connectionName, future.channel());
+                        connectionEstablished(connectionName, future.channel(), listener);
                         addCloseDetectListener(future.channel());
                     }
                 }
@@ -193,25 +194,29 @@ public class CoapConnector {
                 channel.closeFuture().addListener((ChannelFutureListener) future -> {
                     ConnectorPool.removeConnection(connectionName);
                     Log.debug("Connection to " + inetSocketAddress.getHostString() + " was lost. Retrying...");
-                    scheduleConnect(connectionName, inetSocketAddress, tlsMode, 5);
+                    scheduleConnect(connectionName, inetSocketAddress, tlsMode, 5, listener);
                 });
             }
         });
     }
 
-    private void scheduleConnect(String connectionName, InetSocketAddress inetSocketAddress, boolean tlsMode, long millis) {
+    private void scheduleConnect(String connectionName, InetSocketAddress inetSocketAddress, boolean tlsMode, long millis,
+                                 final ConnectionEstablishedListener listener) {
         mTimer.schedule( new TimerTask() {
             @Override
             public void run() {
-                doConnect(connectionName, inetSocketAddress, tlsMode);
+                doConnect(connectionName, inetSocketAddress, tlsMode, listener);
             }
         }, millis );
     }
 
-    public void connectionEstablished(String connectionName, Channel channel) {
+    public void connectionEstablished(String connectionName, Channel channel, final ConnectionEstablishedListener listener) {
         CoapClient coapClient = new CoapClient(channel);
         mChannelMap.put(channel, coapClient);
         ConnectorPool.addConnection(connectionName, coapClient);
+        if(listener != null){
+            listener.onConnectionEstablished(coapClient);
+        }
     }
 
     public void disconenct() throws Exception {
diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectionEstablishedListener.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectionEstablishedListener.java
new file mode 100644 (file)
index 0000000..2c6ef6b
--- /dev/null
@@ -0,0 +1,9 @@
+package org.iotivity.cloud.base.connector;
+
+import org.iotivity.cloud.base.device.IRequestChannel;
+
+public interface ConnectionEstablishedListener {
+
+    void onConnectionEstablished(final IRequestChannel requestChannel);
+
+}
index bf7047c..25fe3d6 100644 (file)
@@ -40,7 +40,12 @@ public class ConnectorPool {
 
     public static void requestConnection(String connectionName, InetSocketAddress inetAddr,
          boolean tlsMode, boolean keepAlive) throws InterruptedException {
-        mConnector.connect(connectionName, inetAddr, tlsMode, keepAlive);
+        mConnector.connect(connectionName, inetAddr, tlsMode, keepAlive, null);
+    }
+
+    public static void requestConnection(final String connectionName, final InetSocketAddress inetAddr,
+               final boolean tlsMode, final boolean keepAlive, final ConnectionEstablishedListener listener) throws InterruptedException {
+        mConnector.connect(connectionName, inetAddr, tlsMode, keepAlive, listener);
     }
 
     public static IRequestChannel getConnection(String name) {