Add CoAP over Websocket interface in cloud 31/17331/8
authorMinji Park <minjii.park@samsung.com>
Fri, 17 Feb 2017 06:48:47 +0000 (15:48 +0900)
committerJee Hyeok Kim <jihyeok13.kim@samsung.com>
Tue, 21 Feb 2017 05:49:19 +0000 (05:49 +0000)
- websocket handler added in cloud stack to support coap-websocket.

Change-Id: I57a8e1beb0e7f76391875b0461732bf30f9535fb
Signed-off-by: Minji Park <minjii.park@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/17331
Tested-by: jenkins-iotivity <jenkins@iotivity.org>
Reviewed-by: Jee Hyeok Kim <jihyeok13.kim@samsung.com>
cloud/interface/README
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/CloudInterfaceServer.java
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java
cloud/stack/src/main/java/org/iotivity/cloud/base/ServerSystem.java
cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/CoapDecoder.java
cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/CoapEncoder.java
cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/CoapLogHandler.java
cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/websocket/WebSocketFrameHandler.java [new file with mode: 0644]
cloud/stack/src/main/java/org/iotivity/cloud/base/server/Server.java
cloud/stack/src/main/java/org/iotivity/cloud/base/server/WebSocketServer.java [new file with mode: 0644]
cloud/stack/src/main/java/org/iotivity/cloud/util/JSONUtil.java

index d0edf41..6a13d59 100644 (file)
@@ -20,8 +20,9 @@ Build and Run
 4) Run .jar file
 
        go to "target" folder
-       $ java -jar CloudInterface-0.0.1-SNAPSHOT.jar arg1(CI CoAP Server Port) arg2(RD CoAP Server IP) arg3(RD CoAP Server Port) arg4(Account Server IP) arg5(Account Server Port) arg6(MQBroker IP) arg7(MQBroker Port) arg8(TLS mode required)
-       e.g. java -jar CloudInterface-0.0.1-SNAPSHOT.jar 5683 127.0.0.1 5684 127.0.0.1 5685 127.0.0.1 5686 0
+       $ java -jar CloudInterface-0.0.1-SNAPSHOT.jar arg1(CI CoAP Server Port) arg2(RD CoAP Server IP) arg3(RD CoAP Server Port) arg4(Account Server IP) arg5(Account Server Port) arg6(MQBroker IP) arg7(MQBroker Port)
+               arg8(CI Http-CoAP proxy server port, '0' if unnecessary) arg9(CI CoAP websocket server port, '0' if unnecessary) arg10(TLS mode <0|1> required) arg11(web log IP, optional) and arg12(web log port, optional)
+       e.g. java -jar CloudInterface-0.0.1-SNAPSHOT.jar 5683 127.0.0.1 5684 127.0.0.1 5685 127.0.0.1 5686 80 8080 0
 
        - Before running a CI server, you should run a RD server, Account server and MQBroker first.
        - If you want to operate on TLS mode, "CLOUD_CERT_FILE(iotivitycloud.crt)", "CLOUD_KEY_FILE(iotivitycloud.key)" and ROOT_CERT_FILE(rootca.crt) files should be placed in the "target" folder.
index bbfa7de..b45a1fc 100755 (executable)
@@ -27,6 +27,7 @@ import java.util.Scanner;
 import org.iotivity.cloud.base.connector.ConnectorPool;
 import org.iotivity.cloud.base.server.CoapServer;
 import org.iotivity.cloud.base.server.HttpServer;
+import org.iotivity.cloud.base.server.WebSocketServer;
 import org.iotivity.cloud.ciserver.DeviceServerSystem.CoapDevicePool;
 import org.iotivity.cloud.ciserver.resources.KeepAliveResource;
 import org.iotivity.cloud.ciserver.resources.RouteResource;
@@ -52,46 +53,37 @@ public class CloudInterfaceServer {
 
         System.out.println("-----CI SERVER-------");
 
-        if (!(args.length == 8 || args.length == 9 || args.length == 10
-                || args.length == 11)) {
-            Log.e("\nCoAP-server <Port> and RD-server <Address> <Port> Account-server <Address> <Port> MQ-broker <Address> <Port> HC-proxy [HTTP-port] and TLS-mode <0|1> are required.\n"
+        if (args.length < 8 || args.length > 12) {
+            Log.e("\nCoAP-server <Port> and RD-server <Address> <Port> Account-server <Address> <Port> "
+                    + "MQ-broker <Address> <Port> HC-proxy <HTTP-port> Websocket-server <Port> and TLS-mode <0|1> are required.\n"
                     + "and WebSocketLog-Server <Address> <Port> (optional)\n"
-                    + "ex) 5683 127.0.0.1 5684 127.0.0.1 5685 127.0.0.1 5686 80 0 127.0.0.1 8080\n");
+                    + "ex) 5683 127.0.0.1 5684 127.0.0.1 5685 127.0.0.1 5686 80 8000 0 127.0.0.1 8080\n");
             return;
         }
 
-        boolean hcProxyMode = false;
-        if (args.length == 9) {
-            hcProxyMode = true;
-        }
+        // CoAP-TCP server port
+        int coapPort = Integer.parseInt(args[0]);
+        // HTTP-CoAP proxy server port
+        int hcProxyPort = Integer.parseInt(args[7]);
+        // CoAP-Websocket server port
+        int websocketPort = Integer.parseInt(args[8]);
 
-        boolean tlsMode = false;
-        if (hcProxyMode) {
-            tlsMode = Integer.parseInt(args[8]) == 1;
+        boolean hcProxyMode = hcProxyPort > 0;
+        boolean websocketMode = websocketPort > 0;
 
-        } else {
-            tlsMode = Integer.parseInt(args[7]) == 1;
-        }
+        boolean tlsMode = Integer.parseInt(args[9]) == 1;
 
-        if (args.length == 10 || args.length == 11) {
-            if (hcProxyMode) {
-                Log.InitWebLog(args[9], args[10],
-                        CloudInterfaceServer.class.getSimpleName().toString());
-            } else {
-                Log.InitWebLog(args[8], args[9],
-                        CloudInterfaceServer.class.getSimpleName().toString());
-            }
+        if (args.length >= 11) {
+            Log.InitWebLog(args[10], args[11], CloudInterfaceServer.class
+                    .getSimpleName().toString());
         }
 
-        ConnectorPool.addConnection("rd",
-                new InetSocketAddress(args[1], Integer.parseInt(args[2])),
-                tlsMode);
-        ConnectorPool.addConnection("account",
-                new InetSocketAddress(args[3], Integer.parseInt(args[4])),
-                tlsMode);
-        ConnectorPool.addConnection("mq",
-                new InetSocketAddress(args[5], Integer.parseInt(args[6])),
-                tlsMode);
+        ConnectorPool.addConnection("rd", new InetSocketAddress(args[1],
+                Integer.parseInt(args[2])), tlsMode);
+        ConnectorPool.addConnection("account", new InetSocketAddress(args[3],
+                Integer.parseInt(args[4])), tlsMode);
+        ConnectorPool.addConnection("mq", new InetSocketAddress(args[5],
+                Integer.parseInt(args[6])), tlsMode);
 
         DeviceServerSystem deviceServer = new DeviceServerSystem();
 
@@ -140,13 +132,17 @@ public class CloudInterfaceServer {
 
         deviceServer.addResource(new RouteResource(devicePool));
 
-        deviceServer.addServer(new CoapServer(
-                new InetSocketAddress(Integer.parseInt(args[0]))));
+        deviceServer.addServer(new CoapServer(new InetSocketAddress(coapPort)));
 
         // Add HTTP Server for HTTP-to-CoAP Proxy
         if (hcProxyMode) {
-            deviceServer.addServer(new HttpServer(
-                    new InetSocketAddress(Integer.valueOf(args[7]))));
+            deviceServer.addServer(new HttpServer(new InetSocketAddress(
+                    hcProxyPort)));
+        }
+
+        if (websocketMode) {
+            deviceServer.addServer(new WebSocketServer(new InetSocketAddress(
+                    websocketPort)));
         }
 
         deviceServer.startSystem(tlsMode);
index 0a86fc5..23c4575 100644 (file)
@@ -46,6 +46,7 @@ import org.iotivity.cloud.base.protocols.enums.SignalingMethod;
 import org.iotivity.cloud.base.server.CoapServer;
 import org.iotivity.cloud.base.server.HttpServer;
 import org.iotivity.cloud.base.server.Server;
+import org.iotivity.cloud.base.server.WebSocketServer;
 import org.iotivity.cloud.util.Bytes;
 import org.iotivity.cloud.util.Cbor;
 import org.iotivity.cloud.util.Log;
@@ -299,8 +300,12 @@ public class DeviceServerSystem extends ServerSystem {
             try {
 
                 if (!(msg instanceof CoapResponse)) {
-                    throw new BadRequestException(
-                            "this msg type is not CoapResponse");
+                    // throw new BadRequestException(
+                    // "this msg type is not CoapResponse");
+
+                    // TODO check websocket handshake response
+                    ctx.writeAndFlush(msg);
+                    return;
                 }
                 // This is CoapResponse
                 // Once the response is valid, add this to deviceList
@@ -526,6 +531,10 @@ public class DeviceServerSystem extends ServerSystem {
             server.addHandler(new CoapAuthHandler());
         }
 
+        if (server instanceof WebSocketServer) {
+            server.addHandler(new CoapAuthHandler());
+        }
+
         if (server instanceof HttpServer) {
             server.addHandler(new HttpAuthHandler());
         }
index 298fa7a..ddd8481 100644 (file)
  */
 package org.iotivity.cloud.base;
 
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,6 +46,7 @@ import org.iotivity.cloud.base.resource.ResourceManager;
 import org.iotivity.cloud.base.server.CoapServer;
 import org.iotivity.cloud.base.server.HttpServer;
 import org.iotivity.cloud.base.server.Server;
+import org.iotivity.cloud.base.server.WebSocketServer;
 import org.iotivity.cloud.util.Log;
 
 import io.netty.channel.ChannelHandler.Sharable;
@@ -146,7 +152,7 @@ public class ServerSystem extends ResourceManager {
     }
 
     public void addServer(Server server) {
-        if (server instanceof CoapServer) {
+        if (server instanceof CoapServer || server instanceof WebSocketServer) {
             server.addHandler(new PersistentPacketReceiver());
         } else if (server instanceof HttpServer) {
             server.addHandler(new NonPersistentPacketReceiver());
index 9b9d5b0..aef3355 100644 (file)
@@ -166,6 +166,10 @@ public class CoapDecoder extends ByteToMessageDecoder {
         }
     }
 
+    public void decode(ByteBuf in, List<Object> out) throws Exception {
+        decode(null, in, out);
+    }
+
     private int parseOptions(CoapMessage coapMessage, ByteBuf byteBuf,
             int maxLength) {
 
@@ -207,7 +211,7 @@ public class CoapDecoder extends ByteToMessageDecoder {
             }
         }
 
-        /// return option length
+        // return option length
         return byteBuf.readerIndex() - startPos;
     }
 }
index 53c3cc4..d65c02c 100644 (file)
@@ -70,6 +70,10 @@ public class CoapEncoder extends MessageToByteEncoder<CoapMessage> {
         }
     }
 
+    public void encode(CoapMessage msg, ByteBuf out) throws Exception {
+        encode(null, msg, out);
+    }
+
     private void calcShimHeader(CoapMessage coapMessage, ByteBuf byteBuf,
             long length) {
         if (length < 13) {
index 37622b9..be43be5 100644 (file)
@@ -77,13 +77,14 @@ public class CoapLogHandler extends ChannelDuplexHandler {
             log = composeCoapRequest(
                     ctx.channel().id().asLongText().substring(26),
                     (CoapRequest) msg);
-        } else {
-            log = composeCoapResponse(
-                    ctx.channel().id().asLongText().substring(26),
-                    (CoapResponse) msg);
+        } else if (msg instanceof CoapResponse) {
+            log = composeCoapResponse(ctx.channel().id().asLongText()
+                    .substring(26), (CoapResponse) msg);
         }
 
-        Log.v(log);
+        if (log != null) {
+            Log.v(log);
+        }
 
         ctx.writeAndFlush(msg);
     }
@@ -98,13 +99,14 @@ public class CoapLogHandler extends ChannelDuplexHandler {
             log = composeCoapRequest(
                     ctx.channel().id().asLongText().substring(26),
                     (CoapRequest) msg);
-        } else {
-            log = composeCoapResponse(
-                    ctx.channel().id().asLongText().substring(26),
-                    (CoapResponse) msg);
+        } else if (msg instanceof CoapResponse) {
+            log = composeCoapResponse(ctx.channel().id().asLongText()
+                    .substring(26), (CoapResponse) msg);
         }
 
-        Log.v(log);
+        if (log != null) {
+            Log.v(log);
+        }
 
         ctx.fireChannelRead(msg);
     }
diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/websocket/WebSocketFrameHandler.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/coap/websocket/WebSocketFrameHandler.java
new file mode 100644 (file)
index 0000000..a87b4e0
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * //******************************************************************
+ * //
+ * // Copyright 2017 Samsung Electronics All Rights Reserved.
+ * //
+ * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+ * //
+ * // Licensed under the Apache License, Version 2.0 (the "License");
+ * // you may not use this file except in compliance with the License.
+ * // You may obtain a copy of the License at
+ * //
+ * //      http://www.apache.org/licenses/LICENSE-2.0
+ * //
+ * // Unless required by applicable law or agreed to in writing, software
+ * // distributed under the License is distributed on an "AS IS" BASIS,
+ * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * // See the License for the specific language governing permissions and
+ * // limitations under the License.
+ * //
+ * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+ */
+package org.iotivity.cloud.base.protocols.coap.websocket;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
+import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
+import org.iotivity.cloud.base.protocols.coap.CoapDecoder;
+import org.iotivity.cloud.base.protocols.coap.CoapEncoder;
+import org.iotivity.cloud.base.protocols.coap.CoapMessage;
+import org.iotivity.cloud.base.protocols.enums.ContentFormat;
+import org.iotivity.cloud.util.Cbor;
+import org.iotivity.cloud.util.JSONUtil;
+import org.iotivity.cloud.util.Log;
+
+public class WebSocketFrameHandler extends ChannelDuplexHandler {
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        Log.v(ctx.channel().id().asLongText().substring(26)
+                + " WebSocket Connected, Address: "
+                + ctx.channel().remoteAddress().toString());
+
+        ctx.fireChannelActive();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        Log.v(ctx.channel().id().asLongText().substring(26)
+                + " WebSocket Disconnected, Address: "
+                + ctx.channel().remoteAddress().toString());
+
+        ctx.fireChannelInactive();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+            throws Exception {
+        // TODO check ping pong
+
+        if (msg instanceof BinaryWebSocketFrame) {
+
+            List<Object> messages = new ArrayList<>();
+            new CoapDecoder().decode(((BinaryWebSocketFrame) msg).content(),
+                    messages);
+
+            for (Object message : messages) {
+                if (message instanceof CoapMessage) {
+                    CoapMessage coapMessage = (CoapMessage) message;
+
+                    // convert content format to cbor if content format is json.
+                    if (coapMessage.getPayloadSize() != 0
+                            && coapMessage.getContentFormat().equals(
+                                    ContentFormat.APPLICATION_JSON)) {
+                        byte[] payload = coapMessage.getPayload();
+                        coapMessage.setPayload(convertJsonToCbor(payload));
+                        coapMessage
+                                .setContentFormat(ContentFormat.APPLICATION_CBOR);
+                    }
+                    ctx.fireChannelRead(coapMessage);
+                }
+            }
+        } else {
+            throw new BadRequestException("invalid request message type");
+        }
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg,
+            ChannelPromise promise) throws Exception {
+        Object newMsg = msg;
+
+        if (msg instanceof DefaultFullHttpResponse) {
+
+            ChannelFuture ch = ctx.writeAndFlush(newMsg);
+            ch.addListener(new ChannelFutureListener() {
+
+                @Override
+                public void operationComplete(ChannelFuture future)
+                        throws Exception {
+                    Log.v(future.channel().id().asLongText().substring(26)
+                            + " WebSocket Handshake done, Address: "
+                            + future.channel().remoteAddress().toString());
+
+                    // remove http encoder/decoder after handshake done.
+                    future.channel().pipeline().remove(HttpServerCodec.class);
+                    future.channel().pipeline()
+                            .remove(HttpObjectAggregator.class);
+                }
+            });
+
+            return;
+        }
+        if (msg instanceof CoapMessage) {
+
+            CoapMessage coapMessage = (CoapMessage) msg;
+
+            // covert content format to json.
+            if (coapMessage.getPayloadSize() != 0) {
+                byte[] payload = coapMessage.getPayload();
+                coapMessage.setPayload(convertCborToJson(payload));
+                coapMessage.setContentFormat(ContentFormat.APPLICATION_JSON);
+            }
+
+            ByteBuf encodedBytes = Unpooled.buffer();
+            new CoapEncoder().encode((CoapMessage) msg, encodedBytes);
+            WebSocketFrame frame = new BinaryWebSocketFrame(encodedBytes);
+            newMsg = frame;
+        } else {
+            throw new InternalServerErrorException(
+                    "invalid response message type");
+        }
+
+        ctx.writeAndFlush(newMsg);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+            throws Exception {
+
+        cause.printStackTrace();
+    }
+
+    private byte[] convertJsonToCbor(byte[] jsonData) {
+
+        JSONUtil<HashMap<String, Object>> json = new JSONUtil<>();
+        HashMap<String, Object> parsedData = json.parseJSON(jsonData,
+                HashMap.class);
+
+        Cbor<HashMap<String, Object>> cbor = new Cbor<>();
+        return cbor.encodingPayloadToCbor(parsedData);
+    }
+
+    private byte[] convertCborToJson(byte[] cborData) {
+
+        Cbor<Object> cbor = new Cbor<>();
+        Object parsedData = cbor.parsePayloadFromCbor(cborData, Object.class);
+
+        JSONUtil<String> json = new JSONUtil<>();
+        return json.writeJSON(parsedData).getBytes();
+    }
+}
\ No newline at end of file
index 0f920eb..3272fe8 100644 (file)
@@ -68,7 +68,7 @@ public abstract class Server {
         }
 
         @Override
-        public void initChannel(SocketChannel ch) {
+        public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
 
             if (mSslContext != null) {
@@ -112,6 +112,9 @@ public abstract class Server {
             b.childHandler(mServerInitializer);
 
             b.bind(mInetSocketAddress).sync();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
         } finally {
         }
     }
diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/server/WebSocketServer.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/server/WebSocketServer.java
new file mode 100644 (file)
index 0000000..f4b8b54
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * //******************************************************************
+ * //
+ * // Copyright 2017 Samsung Electronics All Rights Reserved.
+ * //
+ * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+ * //
+ * // Licensed under the Apache License, Version 2.0 (the "License");
+ * // you may not use this file except in compliance with the License.
+ * // You may obtain a copy of the License at
+ * //
+ * //      http://www.apache.org/licenses/LICENSE-2.0
+ * //
+ * // Unless required by applicable law or agreed to in writing, software
+ * // distributed under the License is distributed on an "AS IS" BASIS,
+ * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * // See the License for the specific language governing permissions and
+ * // limitations under the License.
+ * //
+ * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+ */
+package org.iotivity.cloud.base.server;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
+
+import java.net.InetSocketAddress;
+
+import org.iotivity.cloud.base.protocols.coap.CoapLogHandler;
+import org.iotivity.cloud.base.protocols.coap.websocket.WebSocketFrameHandler;
+
+public class WebSocketServer extends Server {
+
+    private final int    MAX_CONTENT_LENGTH = 65536;
+    private final String PATH               = "/.well-known/coap";
+    private final String SUBPROTOCOL        = "coap";
+
+    public WebSocketServer(InetSocketAddress inetSocketAddress) {
+        super(inetSocketAddress);
+    }
+
+    @Override
+    protected ChannelHandler[] onQueryDefaultHandler() {
+
+        return new ChannelHandler[] { new HttpServerCodec(),
+                new HttpObjectAggregator(MAX_CONTENT_LENGTH),
+                new WebSocketServerCompressionHandler(),
+                new WebSocketServerProtocolHandler(PATH, SUBPROTOCOL, true),
+                new WebSocketFrameHandler(), new CoapLogHandler() };
+    }
+}
\ No newline at end of file
index a1aace5..db18162 100644 (file)
@@ -22,7 +22,6 @@
 package org.iotivity.cloud.util;
 
 import java.io.IOException;
-import java.util.HashMap;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -72,7 +71,7 @@ public class JSONUtil<T> {
         return parsedData;
     }
 
-    public T writeJSON(HashMap<Object, Object> data) {
+    public T writeJSON(Object data) {
         if (data == null)
             return null;