Browse Source

1. Support for old 3.1.0 and new 3.1.1 OASIS specification;
2. Added session management (however without persistence) for supporting for 3.1.1 OASIS specification;
3. Keep Alive ping request disabled if keep alive timeout is set to zero;
4. Inflight queue size managed;
5. Improvement on receiving thread with some modification on underlying network channel;
6. Bug fixing from issues on CodePlex panel;
7. Released new version 4.0;

Paolo Patierno 9 years ago
parent
commit
32a5c6a25a

+ 32 - 3
M2Mqtt/Exceptions/MqttClientException.cs

@@ -51,9 +51,9 @@ namespace uPLibrary.Networking.M2Mqtt.Exceptions
     public enum MqttClientErrorCode
     {
         /// <summary>
-        /// Will topic length error
+        /// Will error (topic, message or QoS level)
         /// </summary>
-        WillTopicWrong = 1,
+        WillWrong = 1,
 
         /// <summary>
         /// Keep alive period too large
@@ -98,6 +98,35 @@ namespace uPLibrary.Networking.M2Mqtt.Exceptions
         /// <summary>
         /// Wrong Message Id
         /// </summary>
-        WrongMessageId
+        WrongMessageId,
+
+        /// <summary>
+        /// Inflight queue is full
+        /// </summary>
+        InflightQueueFull,
+
+        // [v3.1.1]
+        /// <summary>
+        /// Invalid flag bits received 
+        /// </summary>
+        InvalidFlagBits,
+
+        // [v3.1.1]
+        /// <summary>
+        /// Invalid connect flags received
+        /// </summary>
+        InvalidConnectFlags,
+
+        // [v3.1.1]
+        /// <summary>
+        /// Invalid client id
+        /// </summary>
+        InvalidClientId,
+
+        // [v3.1.1]
+        /// <summary>
+        /// Invalid protocol name
+        /// </summary>
+        InvalidProtocolName
     }
 }

+ 8 - 0
M2Mqtt/IMqttNetworkChannel.cs

@@ -36,6 +36,14 @@ namespace uPLibrary.Networking.M2Mqtt
         /// <returns>Number of bytes received</returns>
         int Receive(byte[] buffer);
 
+        /// <summary>
+        /// Receive data from the network channel with a specified timeout
+        /// </summary>
+        /// <param name="buffer">Data buffer for receiving data</param>
+        /// <param name="timeout">Timeout on receiving (in milliseconds)</param>
+        /// <returns>Number of bytes received</returns>
+        int Receive(byte[] buffer, int timeout);
+
         /// <summary>
         /// Send data on the network channel to the broker
         /// </summary>

+ 3 - 0
M2Mqtt/M2Mqtt.csproj

@@ -73,6 +73,9 @@
     <Compile Include="Net\MqttNetworkChannel.cs" />
     <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\Trace.cs" />
     <Compile Include="Utility\QueueExtension.cs" />
   </ItemGroup>

+ 10 - 7
M2Mqtt/M2MqttMono.csproj

@@ -37,10 +37,11 @@
     <Compile Include="Exceptions\MqttCommunicationException.cs" />
     <Compile Include="Exceptions\MqttConnectionException.cs" />
     <Compile Include="Exceptions\MqttTimeoutException.cs" />
+    <Compile Include="IMqttNetworkChannel.cs" />
     <Compile Include="Messages\MqttMsgBase.cs" />
     <Compile Include="Messages\MqttMsgConnack.cs" />
     <Compile Include="Messages\MqttMsgConnect.cs" />
-	<Compile Include="Messages\MqttMsgConnectEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgConnectEventArgs.cs" />
     <Compile Include="Messages\MqttMsgContext.cs" />
     <Compile Include="Messages\MqttMsgDisconnect.cs" />
     <Compile Include="Messages\MqttMsgPingReq.cs" />
@@ -48,26 +49,28 @@
     <Compile Include="Messages\MqttMsgPuback.cs" />
     <Compile Include="Messages\MqttMsgPubcomp.cs" />
     <Compile Include="Messages\MqttMsgPublish.cs" />
-    <Compile Include="Messages\MqttMsgPublishEventArgs.cs" />
     <Compile Include="Messages\MqttMsgPublishedEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgPublishEventArgs.cs" />
     <Compile Include="Messages\MqttMsgPubrec.cs" />
     <Compile Include="Messages\MqttMsgPubrel.cs" />
     <Compile Include="Messages\MqttMsgSuback.cs" />
     <Compile Include="Messages\MqttMsgSubscribe.cs" />
     <Compile Include="Messages\MqttMsgSubscribedEventArgs.cs" />
-	<Compile Include="Messages\MqttMsgSubscribeEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgSubscribeEventArgs.cs" />
     <Compile Include="Messages\MqttMsgUnsuback.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribe.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribedEventArgs.cs" />
-	<Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
     <Compile Include="MqttClient.cs" />
-    <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Net\Fx.cs" />
     <Compile Include="Net\MqttNetworkChannel.cs" />
     <Compile Include="MqttSettings.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
+    <Compile Include="Utility\Trace.cs" />
     <Compile Include="Utility\QueueExtension.cs" />
-    <Compile Include="IMqttNetworkChannel.cs" />
-	<Compile Include="Utility\Trace.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
 </Project>

+ 9 - 6
M2Mqtt/M2MqttNetCf35.csproj

@@ -62,8 +62,8 @@
     <Compile Include="Messages\MqttMsgBase.cs" />
     <Compile Include="Messages\MqttMsgConnack.cs" />
     <Compile Include="Messages\MqttMsgConnect.cs" />
-	<Compile Include="Messages\MqttMsgConnectEventArgs.cs" />
-	<Compile Include="Messages\MqttMsgContext.cs" />
+    <Compile Include="Messages\MqttMsgConnectEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgContext.cs" />
     <Compile Include="Messages\MqttMsgDisconnect.cs" />
     <Compile Include="Messages\MqttMsgPingReq.cs" />
     <Compile Include="Messages\MqttMsgPingResp.cs" />
@@ -77,18 +77,21 @@
     <Compile Include="Messages\MqttMsgSuback.cs" />
     <Compile Include="Messages\MqttMsgSubscribe.cs" />
     <Compile Include="Messages\MqttMsgSubscribedEventArgs.cs" />
-	<Compile Include="Messages\MqttMsgSubscribeEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgSubscribeEventArgs.cs" />
     <Compile Include="Messages\MqttMsgUnsuback.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribe.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribedEventArgs.cs" />
-	<Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
+    <Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
     <Compile Include="MqttClient.cs" />
-    <Compile Include="MqttSettings.cs" />
     <Compile Include="Net\Fx.cs" />
     <Compile Include="Net\MqttNetworkChannel.cs" />
+    <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="Utility\QueueExtension.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\Trace.cs" />
+    <Compile Include="Utility\QueueExtension.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CompactFramework.CSharp.targets" />
   <ProjectExtensions>

+ 5 - 2
M2Mqtt/M2MqttNetCf39.csproj

@@ -71,12 +71,15 @@
     <Compile Include="Messages\MqttMsgUnsubscribedEventArgs.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
     <Compile Include="MqttClient.cs" />
-    <Compile Include="MqttSettings.cs" />
     <Compile Include="Net\Fx.cs" />
     <Compile Include="Net\MqttNetworkChannel.cs" />
+    <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="Utility\QueueExtension.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\Trace.cs" />
+    <Compile Include="Utility\QueueExtension.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildExtensionsPath)\Microsoft\$(TargetFrameworkIdentifier)\$(TargetFrameworkTargetsVersion)\Microsoft.$(TargetFrameworkIdentifier).CSharp.targets" />
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 

+ 4 - 0
M2Mqtt/M2MqttWinRT.csproj

@@ -72,10 +72,14 @@
     <Compile Include="MqttClient.cs" />
     <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\QueueExtension.cs" />
     <Compile Include="Utility\Trace.cs" />
     <Compile Include="WinRT\Fx.cs" />
     <Compile Include="WinRT\MqttNetworkChannel.cs" />
+    <Compile Include="WinRT\Hashtable.cs" />
     <Compile Include="WinRT\Queue.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildExtensionsPath32)\Microsoft\Portable\$(TargetFrameworkVersion)\Microsoft.Portable.CSharp.targets" />

+ 35 - 1
M2Mqtt/Messages/MqttMsgBase.cs

@@ -30,6 +30,9 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         internal const byte MSG_TYPE_MASK = 0xF0;
         internal const byte MSG_TYPE_OFFSET = 0x04;
         internal const byte MSG_TYPE_SIZE = 0x04;
+        internal const byte MSG_FLAG_BITS_MASK = 0x0F;      // [v3.1.1]
+        internal const byte MSG_FLAG_BITS_OFFSET = 0x00;    // [v3.1.1]
+        internal const byte MSG_FLAG_BITS_SIZE = 0x04;      // [v3.1.1]
         internal const byte DUP_FLAG_MASK = 0x08;
         internal const byte DUP_FLAG_OFFSET = 0x03;
         internal const byte DUP_FLAG_SIZE = 0x01;
@@ -56,11 +59,30 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         internal const byte MQTT_MSG_PINGRESP_TYPE = 0x0D;
         internal const byte MQTT_MSG_DISCONNECT_TYPE = 0x0E;
 
+        // [v3.1.1] MQTT flag bits
+        internal const byte MQTT_MSG_CONNECT_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_CONNACK_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_PUBLISH_FLAG_BITS = 0x00; // just defined as 0x00 but depends on publish props (dup, qos, retain) 
+        internal const byte MQTT_MSG_PUBACK_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_PUBREC_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_PUBREL_FLAG_BITS = 0x02;
+        internal const byte MQTT_MSG_PUBCOMP_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_SUBSCRIBE_FLAG_BITS = 0x02;
+        internal const byte MQTT_MSG_SUBACK_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_UNSUBSCRIBE_FLAG_BITS = 0x02;
+        internal const byte MQTT_MSG_UNSUBACK_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_PINGREQ_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_PINGRESP_FLAG_BITS = 0x00;
+        internal const byte MQTT_MSG_DISCONNECT_FLAG_BITS = 0x00;
+
         // QOS levels
         public const byte QOS_LEVEL_AT_MOST_ONCE = 0x00;
         public const byte QOS_LEVEL_AT_LEAST_ONCE = 0x01;
         public const byte QOS_LEVEL_EXACTLY_ONCE = 0x02;
 
+        // SUBSCRIBE QoS level granted failure [v3.1.1]
+        public const byte QOS_LEVEL_GRANTED_FAILURE = 0x80;
+
         internal const ushort MAX_TOPIC_LENGTH = 65535;
         internal const ushort MIN_TOPIC_LENGTH = 1;
         internal const byte MESSAGE_ID_SIZE = 2;
@@ -105,6 +127,15 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             set { this.retain = value; }
         }
 
+        /// <summary>
+        /// Message identifier for the message
+        /// </summary>
+        public ushort MessageId
+        {
+            get { return this.messageId; }
+            set { this.messageId = value; }
+        }
+
         #endregion
 
         // message type
@@ -115,12 +146,15 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         protected byte qosLevel;
         // retain flag
         protected bool retain;
+        // message identifier
+        protected ushort messageId;
 
         /// <summary>
         /// Returns message bytes rapresentation
         /// </summary>
+        /// <param name="protocolVersion">Protocol version</param>
         /// <returns>Bytes rapresentation</returns>
-        public abstract byte[] GetBytes();
+        public abstract byte[] GetBytes(byte protocolVersion);
         
         /// <summary>
         /// Encode remaining length and insert it into message buffer

+ 53 - 9
M2Mqtt/Messages/MqttMsgConnack.cs

@@ -15,6 +15,7 @@ Contributors:
 */
 
 using System;
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -35,6 +36,13 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
         private const byte TOPIC_NAME_COMP_RESP_BYTE_OFFSET = 0;
         private const byte TOPIC_NAME_COMP_RESP_BYTE_SIZE = 1;
+        // [v3.1.1] connect acknowledge flags replace "old" topic name compression respone (not used in 3.1)
+        private const byte CONN_ACK_FLAGS_BYTE_OFFSET = 0;
+        private const byte CONN_ACK_FLAGS_BYTE_SIZE = 1;
+        // [v3.1.1] session present flag
+        private const byte SESSION_PRESENT_FLAG_MASK = 0x01;
+        private const byte SESSION_PRESENT_FLAG_OFFSET = 0x00;
+        private const byte SESSION_PRESENT_FLAG_SIZE = 0x01;
         private const byte CONN_RETURN_CODE_BYTE_OFFSET = 1;
         private const byte CONN_RETURN_CODE_BYTE_SIZE = 1;
 
@@ -42,6 +50,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
         #region Properties...
 
+        // [v3.1.1] session present flag
+        /// <summary>
+        /// Session present flag
+        /// </summary>
+        public bool SessionPresent
+        {
+            get { return this.sessionPresent; }
+            set { this.sessionPresent = value; }
+        }
+
         /// <summary>
         /// Return Code
         /// </summary>
@@ -53,6 +71,9 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
         #endregion
 
+        // [v3.1.1] session present flag
+        private bool sessionPresent;
+
         // return code for CONNACK message
         private byte returnCode;
 
@@ -68,26 +89,39 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a CONNACK message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>CONNACK message instance</returns>
-        public static MqttMsgConnack Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgConnack Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             MqttMsgConnack msg = new MqttMsgConnack();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_CONNACK_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
 
             // read bytes from socket...
             channel.Receive(buffer);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] ... set session present flag ...
+                msg.sessionPresent = (buffer[CONN_ACK_FLAGS_BYTE_OFFSET] & SESSION_PRESENT_FLAG_MASK) != 0x00;
+            }
             // ...and set return code from broker
             msg.returnCode = buffer[CONN_RETURN_CODE_BYTE_OFFSET];
 
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte ProtocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -96,8 +130,12 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             byte[] buffer;
             int index = 0;
 
-            // topic name compression response and connect return code
-            varHeaderSize += (TOPIC_NAME_COMP_RESP_BYTE_SIZE + CONN_RETURN_CODE_BYTE_SIZE);
+            if (ProtocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                // flags byte and connect return code
+                varHeaderSize += (CONN_ACK_FLAGS_BYTE_SIZE + CONN_RETURN_CODE_BYTE_SIZE);
+            else
+                // topic name compression response and connect return code
+                varHeaderSize += (TOPIC_NAME_COMP_RESP_BYTE_SIZE + CONN_RETURN_CODE_BYTE_SIZE);
 
             remainingLength += (varHeaderSize + payloadSize);
 
@@ -117,14 +155,20 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)(MQTT_MSG_CONNACK_TYPE << MSG_TYPE_OFFSET);
-            index++;
-
+            if (ProtocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_CONNACK_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_CONNACK_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (byte)(MQTT_MSG_CONNACK_TYPE << MSG_TYPE_OFFSET);
+            
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 
-            // topic name compression response (reserved values. not used);
-            buffer[index++] = 0x00;
+            if (ProtocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                // [v3.1.1] session present flag
+                buffer[index++] = this.sessionPresent ? (byte)(1 << SESSION_PRESENT_FLAG_OFFSET) : (byte)0x00;
+            else
+                // topic name compression response (reserved values. not used);
+                buffer[index++] = 0x00;
             
             // connect return code
             buffer[index++] = this.returnCode;

+ 105 - 48
M2Mqtt/Messages/MqttMsgConnect.cs

@@ -28,19 +28,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         #region Constants...
 
         // protocol name supported
-        internal const string PROTOCOL_NAME = "MQIsdp";
-
-        // max length for client id
+        internal const string PROTOCOL_NAME_V3_1 = "MQIsdp";
+        internal const string PROTOCOL_NAME_V3_1_1 = "MQTT"; // [v.3.1.1]
+        
+        // max length for client id (removed in 3.1.1)
         internal const int CLIENT_ID_MAX_LENGTH = 23;
 
         // variable header fields
         internal const byte PROTOCOL_NAME_LEN_SIZE = 2;
-        internal const byte PROTOCOL_NAME_SIZE = 6;
-        internal const byte PROTOCOL_VERSION_NUMBER_SIZE = 1;
+        internal const byte PROTOCOL_NAME_V3_1_SIZE = 6;
+        internal const byte PROTOCOL_NAME_V3_1_1_SIZE = 4; // [v.3.1.1]
+        internal const byte PROTOCOL_VERSION_SIZE = 1;
         internal const byte CONNECT_FLAGS_SIZE = 1;
         internal const byte KEEP_ALIVE_TIME_SIZE = 2;
 
-        internal const byte PROTOCOL_VERSION = 0x03;
+        internal const byte PROTOCOL_VERSION_V3_1 = 0x03;
+        internal const byte PROTOCOL_VERSION_V3_1_1 = 0x04; // [v.3.1.1]
         internal const ushort KEEP_ALIVE_PERIOD_DEFAULT = 60; // seconds
         internal const ushort MAX_KEEP_ALIVE = 65535; // 16 bit
 
@@ -63,6 +66,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         internal const byte CLEAN_SESSION_FLAG_MASK = 0x02;
         internal const byte CLEAN_SESSION_FLAG_OFFSET = 0x01;
         internal const byte CLEAN_SESSION_FLAG_SIZE = 0x01;
+        // [v.3.1.1] lsb (reserved) must be now 0
+        internal const byte RESERVED_FLAG_MASK = 0x01;
+        internal const byte RESERVED_FLAG_OFFSET = 0x00;
+        internal const byte RESERVED_FLAG_SIZE = 0x01;
 
         #endregion
 
@@ -209,9 +216,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         public MqttMsgConnect()
         {
             this.type = MQTT_MSG_CONNECT_TYPE;
-
-            this.protocolName = PROTOCOL_NAME;
-            this.protocolVersion = PROTOCOL_VERSION;
         }
 
         /// <summary>
@@ -219,7 +223,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// </summary>
         /// <param name="clientId">Client identifier</param>
         public MqttMsgConnect(string clientId) :
-            this(clientId, null, null, false, QOS_LEVEL_AT_LEAST_ONCE, false, null, null, true, KEEP_ALIVE_PERIOD_DEFAULT)
+            this(clientId, null, null, false, QOS_LEVEL_AT_LEAST_ONCE, false, null, null, true, KEEP_ALIVE_PERIOD_DEFAULT, PROTOCOL_VERSION_V3_1_1)
         {
         }
 
@@ -236,6 +240,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// <param name="willMessage">Will message</param>
         /// <param name="cleanSession">Clean sessione flag</param>
         /// <param name="keepAlivePeriod">Keep alive period</param>
+        /// <param name="protocolVersion">Protocol version</param>
         public MqttMsgConnect(string clientId, 
             string username, 
             string password,
@@ -245,14 +250,12 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             string willTopic,
             string willMessage,
             bool cleanSession,
-            ushort keepAlivePeriod
+            ushort keepAlivePeriod,
+            byte protocolVersion
             )
         {
             this.type = MQTT_MSG_CONNECT_TYPE;
 
-            this.protocolName = PROTOCOL_NAME;
-            this.protocolVersion = PROTOCOL_VERSION;
-
             this.clientId = clientId;
             this.username = username;
             this.password = password;
@@ -263,15 +266,19 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.willMessage = willMessage;
             this.cleanSession = cleanSession;
             this.keepAlivePeriod = keepAlivePeriod;
+            // [v.3.1.1] added new protocol name and version
+            this.protocolVersion = protocolVersion;
+            this.protocolName = (this.protocolVersion == PROTOCOL_VERSION_V3_1_1) ? PROTOCOL_NAME_V3_1_1 : PROTOCOL_NAME_V3_1;
         }
 
         /// <summary>
         /// Parse bytes for a CONNECT message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>CONNECT message instance</returns>
-        public static MqttMsgConnect Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgConnect Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
@@ -306,11 +313,20 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             index += protNameUtf8Length;
             msg.protocolName = new String(Encoding.UTF8.GetChars(protNameUtf8));
 
+            // [v3.1.1] wrong protocol name
+            if (!msg.protocolName.Equals(PROTOCOL_NAME_V3_1) && !msg.protocolName.Equals(PROTOCOL_NAME_V3_1_1))
+                throw new MqttClientException(MqttClientErrorCode.InvalidProtocolName);
+
             // protocol version
             msg.protocolVersion = buffer[index];
-            index += PROTOCOL_VERSION_NUMBER_SIZE;
+            index += PROTOCOL_VERSION_SIZE;
 
             // connect flags
+            // [v3.1.1] check lsb (reserved) must be 0
+            if ((msg.protocolVersion == PROTOCOL_VERSION_V3_1_1) &&
+                ((buffer[index] & RESERVED_FLAG_MASK) != 0x00))
+                throw new MqttClientException(MqttClientErrorCode.InvalidConnectFlags);
+
             isUsernameFlag = (buffer[index] & USERNAME_FLAG_MASK) != 0x00;
             isPasswordFlag = (buffer[index] & PASSWORD_FLAG_MASK) != 0x00;
             msg.willRetain = (buffer[index] & WILL_RETAIN_FLAG_MASK) != 0x00;
@@ -323,13 +339,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             msg.keepAlivePeriod = (ushort)((buffer[index++] << 8) & 0xFF00);
             msg.keepAlivePeriod |= buffer[index++];
 
-            // client identifier
+            // client identifier [v3.1.1] it may be zero bytes long (empty string)
             clientIdUtf8Length = ((buffer[index++] << 8) & 0xFF00);
             clientIdUtf8Length |= buffer[index++];
             clientIdUtf8 = new byte[clientIdUtf8Length];
             Array.Copy(buffer, index, clientIdUtf8, 0, clientIdUtf8Length);
             index += clientIdUtf8Length;
             msg.clientId = new String(Encoding.UTF8.GetChars(clientIdUtf8));
+            // [v3.1.1] if client identifier is zero bytes long, clean session must be true
+            if ((msg.protocolVersion == PROTOCOL_VERSION_V3_1_1) && (clientIdUtf8Length == 0) && (!msg.cleanSession))
+                throw new MqttClientException(MqttClientErrorCode.InvalidClientId);
 
             // will topic and will message
             if (msg.willFlag)
@@ -374,7 +393,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -384,21 +403,49 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             int index = 0;
 
             byte[] clientIdUtf8 = Encoding.UTF8.GetBytes(this.clientId);
-            byte[] willTopicUtf8 = (this.willTopic != null) ? Encoding.UTF8.GetBytes(this.willTopic) : null;
-            byte[] willMessageUtf8 = (this.willMessage != null) ? Encoding.UTF8.GetBytes(this.willMessage) : null;
-            byte[] usernameUtf8 = (this.username != null) ? Encoding.UTF8.GetBytes(this.username) : null;
-            byte[] passwordUtf8 = (this.password != null) ? Encoding.UTF8.GetBytes(this.password) : null;
-
-            // will flag set but will topic wrong
-            if (this.willFlag && (willTopicUtf8.Length == 0))
-                throw new MqttClientException(MqttClientErrorCode.WillTopicWrong);
+            byte[] willTopicUtf8 = (this.willFlag && (this.willTopic != null)) ? Encoding.UTF8.GetBytes(this.willTopic) : null;
+            byte[] willMessageUtf8 = (this.willFlag && (this.willMessage != null)) ? Encoding.UTF8.GetBytes(this.willMessage) : null;
+            byte[] usernameUtf8 = ((this.username != null) && (this.username.Length > 0)) ? Encoding.UTF8.GetBytes(this.username) : null;
+            byte[] passwordUtf8 = ((this.password != null) && (this.password.Length > 0)) ? Encoding.UTF8.GetBytes(this.password) : null;
+
+            // [v3.1.1]
+            if (this.protocolVersion == PROTOCOL_VERSION_V3_1_1)
+            {
+                // will flag set, will topic and will message MUST be present
+                if (this.willFlag &&  ((this.willQosLevel >= 0x03) ||
+                                       (willTopicUtf8 == null) || (willMessageUtf8 == null) ||
+                                       ((willTopicUtf8 != null) && (willTopicUtf8.Length == 0)) || 
+                                       ((willMessageUtf8 != null) && (willMessageUtf8.Length == 0))))
+                    throw new MqttClientException(MqttClientErrorCode.WillWrong);
+                // willflag not set, retain must be 0 and will topic and message MUST NOT be present
+                else if (!this.willFlag && ((this.willRetain) ||
+                                            (willTopicUtf8 != null) || (willMessageUtf8 != null) ||
+                                            ((willTopicUtf8 != null) && (willTopicUtf8.Length != 0)) || 
+                                            ((willMessageUtf8 != null) && (willMessageUtf8.Length != 0))))
+                    throw new MqttClientException(MqttClientErrorCode.WillWrong);
+            }
+
             if (this.keepAlivePeriod > MAX_KEEP_ALIVE)
                 throw new MqttClientException(MqttClientErrorCode.KeepAliveWrong);
 
+            // check on will QoS Level
+            if ((this.willQosLevel < MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE) ||
+                (this.willQosLevel > MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
+                throw new MqttClientException(MqttClientErrorCode.WillWrong);
+
             // protocol name field size
-            varHeaderSize += (PROTOCOL_NAME_LEN_SIZE + PROTOCOL_NAME_SIZE);
-            // protocol version number field size
-            varHeaderSize += PROTOCOL_VERSION_NUMBER_SIZE;
+            // MQTT version 3.1
+            if (this.protocolVersion == PROTOCOL_VERSION_V3_1)
+            {
+                varHeaderSize += (PROTOCOL_NAME_LEN_SIZE + PROTOCOL_NAME_V3_1_SIZE);
+            }
+            // MQTT version 3.1.1
+            else
+            {
+                varHeaderSize += (PROTOCOL_NAME_LEN_SIZE + PROTOCOL_NAME_V3_1_1_SIZE);
+            }
+            // protocol level field size
+            varHeaderSize += PROTOCOL_VERSION_SIZE;
             // connect flags field size
             varHeaderSize += CONNECT_FLAGS_SIZE;
             // keep alive timer field size
@@ -433,30 +480,40 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_CONNECT_TYPE << MSG_TYPE_OFFSET);
+            buffer[index++] = (MQTT_MSG_CONNECT_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_CONNECT_FLAG_BITS; // [v.3.1.1]
 
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 
             // protocol name
             buffer[index++] = 0; // MSB protocol name size
-            buffer[index++] = PROTOCOL_NAME_SIZE; // LSB protocol name size
-            buffer[index++] = (byte)'M';
-            buffer[index++] = (byte)'Q';
-            buffer[index++] = (byte)'I';
-            buffer[index++] = (byte)'s';
-            buffer[index++] = (byte)'d';
-            buffer[index++] = (byte)'p';
-
-            // protocol version
-            buffer[index++] = PROTOCOL_VERSION;
-
+            // MQTT version 3.1
+            if (this.protocolVersion == PROTOCOL_VERSION_V3_1)
+            {
+                buffer[index++] = PROTOCOL_NAME_V3_1_SIZE; // LSB protocol name size
+                Array.Copy(Encoding.UTF8.GetBytes(PROTOCOL_NAME_V3_1), 0, buffer, index, PROTOCOL_NAME_V3_1_SIZE);
+                index += PROTOCOL_NAME_V3_1_SIZE;
+                // protocol version
+                buffer[index++] = PROTOCOL_VERSION_V3_1;
+            }
+            // MQTT version 3.1.1
+            else
+            {
+                buffer[index++] = PROTOCOL_NAME_V3_1_1_SIZE; // LSB protocol name size
+                Array.Copy(Encoding.UTF8.GetBytes(PROTOCOL_NAME_V3_1_1), 0, buffer, index, PROTOCOL_NAME_V3_1_1_SIZE);
+                index += PROTOCOL_NAME_V3_1_1_SIZE;
+                // protocol version
+                buffer[index++] = PROTOCOL_VERSION_V3_1_1;
+            }
+            
             // connect flags
             byte connectFlags = 0x00;
-            connectFlags |= (this.username != null) ? (byte)(1 << USERNAME_FLAG_OFFSET) : (byte)0x00;
-            connectFlags |= (this.password != null) ? (byte)(1 << PASSWORD_FLAG_OFFSET) : (byte)0x00;
+            connectFlags |= (usernameUtf8 != null) ? (byte)(1 << USERNAME_FLAG_OFFSET) : (byte)0x00;
+            connectFlags |= (passwordUtf8 != null) ? (byte)(1 << PASSWORD_FLAG_OFFSET) : (byte)0x00;
             connectFlags |= (this.willRetain) ? (byte)(1 << WILL_RETAIN_FLAG_OFFSET) : (byte)0x00;
-            connectFlags |= (byte)(this.willQosLevel << WILL_QOS_FLAG_OFFSET);
+            // only if will flag is set, we have to use will QoS level (otherwise is MUST be 0)
+            if (this.willFlag)
+                connectFlags |= (byte)(this.willQosLevel << WILL_QOS_FLAG_OFFSET);
             connectFlags |= (this.willFlag) ? (byte)(1 << WILL_FLAG_OFFSET) : (byte)0x00;
             connectFlags |= (this.cleanSession) ? (byte)(1 << CLEAN_SESSION_FLAG_OFFSET) : (byte)0x00;
             buffer[index++] = connectFlags;
@@ -472,7 +529,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             index += clientIdUtf8.Length;
 
             // will topic
-            if (this.willFlag && (this.willTopic != null))
+            if (this.willFlag && (willTopicUtf8 != null))
             {
                 buffer[index++] = (byte)((willTopicUtf8.Length >> 8) & 0x00FF); // MSB
                 buffer[index++] = (byte)(willTopicUtf8.Length & 0x00FF); // LSB
@@ -481,7 +538,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             }
 
             // will message
-            if (this.willFlag && (this.willMessage != null))
+            if (this.willFlag && (willMessageUtf8 != null))
             {
                 buffer[index++] = (byte)((willMessageUtf8.Length >> 8) & 0x00FF); // MSB
                 buffer[index++] = (byte)(willMessageUtf8.Length & 0x00FF); // LSB
@@ -490,7 +547,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             }
 
             // username
-            if (this.username != null)
+            if (usernameUtf8 != null)
             {
                 buffer[index++] = (byte)((usernameUtf8.Length >> 8) & 0x00FF); // MSB
                 buffer[index++] = (byte)(usernameUtf8.Length & 0x00FF); // LSB
@@ -499,7 +556,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             }
 
             // password
-            if (this.password != null)
+            if (passwordUtf8 != null)
             {
                 buffer[index++] = (byte)((passwordUtf8.Length >> 8) & 0x00FF); // MSB
                 buffer[index++] = (byte)(passwordUtf8.Length & 0x00FF); // LSB

+ 12 - 0
M2Mqtt/Messages/MqttMsgContext.cs

@@ -126,6 +126,18 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// </summary>
         SendPuback,
 
+        // [v3.1.1] SUBSCRIBE isn't "officially" QOS = 1
+        /// <summary>
+        /// Send SUBSCRIBE message
+        /// </summary>
+        SendSubscribe,
+
+        // [v3.1.1] UNSUBSCRIBE isn't "officially" QOS = 1
+        /// <summary>
+        /// Send UNSUBSCRIBE message
+        /// </summary>
+        SendUnsubscribe,
+
         /// <summary>
         /// (QOS = 1), SUBSCRIBE sent, wait for SUBACK
         /// </summary>

+ 15 - 3
M2Mqtt/Messages/MqttMsgDisconnect.cs

@@ -14,6 +14,7 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -34,12 +35,20 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a DISCONNECT message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>DISCONNECT message instance</returns>
-        public static MqttMsgDisconnect Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgDisconnect Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             MqttMsgDisconnect msg = new MqttMsgDisconnect();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_DISCONNECT_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             // NOTE : remainingLength must be 0
@@ -47,13 +56,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             byte[] buffer = new byte[2];
             int index = 0;
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_DISCONNECT_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_DISCONNECT_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_DISCONNECT_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_DISCONNECT_TYPE << MSG_TYPE_OFFSET);
             buffer[index++] = 0x00;
 
             return buffer;

+ 15 - 3
M2Mqtt/Messages/MqttMsgPingReq.cs

@@ -14,6 +14,7 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -30,13 +31,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.type = MQTT_MSG_PINGREQ_TYPE;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             byte[] buffer = new byte[2];
             int index = 0;
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_PINGREQ_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PINGREQ_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PINGREQ_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_PINGREQ_TYPE << MSG_TYPE_OFFSET);
             buffer[index++] = 0x00;
 
             return buffer;
@@ -46,12 +50,20 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PINGREQ message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PINGREQ message instance</returns>
-        public static MqttMsgPingReq Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPingReq Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             MqttMsgPingReq msg = new MqttMsgPingReq();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PINGREQ_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // already know remaininglength is zero (MQTT specification),
             // so it isn't necessary to read other data from socket
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);

+ 15 - 3
M2Mqtt/Messages/MqttMsgPingResp.cs

@@ -15,6 +15,7 @@ Contributors:
 */
 
 using System;
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -35,12 +36,20 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PINGRESP message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PINGRESP message instance</returns>
-        public static MqttMsgPingResp Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPingResp Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             MqttMsgPingResp msg = new MqttMsgPingResp();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PINGRESP_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // already know remaininglength is zero (MQTT specification),
             // so it isn't necessary to read other data from socket
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
@@ -48,13 +57,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             byte[] buffer = new byte[2];
             int index = 0;
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_PINGRESP_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PINGRESP_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PINGRESP_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_PINGRESP_TYPE << MSG_TYPE_OFFSET);
             buffer[index++] = 0x00;
 
             return buffer;

+ 16 - 20
M2Mqtt/Messages/MqttMsgPuback.cs

@@ -14,6 +14,8 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
+
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
     /// <summary>
@@ -21,23 +23,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     /// </summary>
     public class MqttMsgPuback : MqttMsgBase
     {
-        #region Properties...
-
-        /// <summary>
-        /// Message identifier for the publish message
-        /// that is acknowledged
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
-        #endregion
-
-        // message identifier
-        private ushort messageId;
-        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -46,7 +31,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.type = MQTT_MSG_PUBACK_TYPE;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -76,7 +61,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_PUBACK_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PUBACK_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PUBACK_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_PUBACK_TYPE << MSG_TYPE_OFFSET);
                               
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
@@ -92,14 +80,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PUBACK message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PUBACK message instance</returns>
-        public static MqttMsgPuback Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPuback Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgPuback msg = new MqttMsgPuback();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PUBACK_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];

+ 16 - 19
M2Mqtt/Messages/MqttMsgPubcomp.cs

@@ -14,6 +14,8 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
+
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
     /// <summary>
@@ -21,22 +23,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     /// </summary>
     public class MqttMsgPubcomp : MqttMsgBase
     {
-        #region Properties...
-
-        /// <summary>
-        /// Message identifier for the acknowledged publish message
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
-        #endregion
-
-        // message identifier
-        private ushort messageId;
-        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -45,7 +31,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.type = MQTT_MSG_PUBCOMP_TYPE;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -75,7 +61,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_PUBCOMP_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PUBCOMP_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PUBCOMP_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_PUBCOMP_TYPE << MSG_TYPE_OFFSET);
 
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
@@ -91,14 +80,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PUBCOMP message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PUBCOMP message instance</returns>
-        public static MqttMsgPubcomp Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPubcomp Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgPubcomp msg = new MqttMsgPubcomp();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PUBCOMP_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];

+ 11 - 14
M2Mqtt/Messages/MqttMsgPublish.cs

@@ -45,24 +45,13 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             set { this.message = value; }
         }
 
-        /// <summary>
-        /// Message identifier
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
         #endregion
 
         // message topic
         private string topic;
         // message data
         private byte[] message;
-        // message identifier
-        ushort messageId;
-
+        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -105,7 +94,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.messageId = 0;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -122,6 +111,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             if ((this.topic.Length < MIN_TOPIC_LENGTH) || (this.topic.Length > MAX_TOPIC_LENGTH))
                 throw new MqttClientException(MqttClientErrorCode.TopicLength);
 
+            // check wrong QoS level (both bits can't be set 1)
+            if (this.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
+                throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
+
             byte[] topicUtf8 = Encoding.UTF8.GetBytes(this.topic);
 
             // topic name
@@ -198,9 +191,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PUBLISH message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PUBLISH message instance</returns>
-        public static MqttMsgPublish Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPublish Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
@@ -225,6 +219,9 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
             // read QoS level from fixed header
             msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
+            // check wrong QoS level (both bits can't be set 1)
+            if (msg.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
+                throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
             // read DUP flag from fixed header
             msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
             // read retain flag from fixed header

+ 16 - 19
M2Mqtt/Messages/MqttMsgPubrec.cs

@@ -14,6 +14,8 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
+
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
     /// <summary>
@@ -21,22 +23,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     /// </summary>
     public class MqttMsgPubrec : MqttMsgBase
     {
-        #region Properties...
-
-        /// <summary>
-        /// Message identifier for the acknowledged publish message
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
-        #endregion
-
-        // message identifier
-        private ushort messageId;
-        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -45,7 +31,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.type = MQTT_MSG_PUBREC_TYPE;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -75,7 +61,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index++] = (MQTT_MSG_PUBREC_TYPE << MSG_TYPE_OFFSET);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PUBREC_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PUBREC_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (MQTT_MSG_PUBREC_TYPE << MSG_TYPE_OFFSET);
 
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
@@ -91,14 +80,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PUBREC message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PUBREC message instance</returns>
-        public static MqttMsgPubrec Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPubrec Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgPubrec msg = new MqttMsgPubrec();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PUBREC_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];

+ 30 - 26
M2Mqtt/Messages/MqttMsgPubrel.cs

@@ -14,6 +14,8 @@ Contributors:
    Paolo Patierno - initial API and implementation and/or initial documentation
 */
 
+using uPLibrary.Networking.M2Mqtt.Exceptions;
+
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
     /// <summary>
@@ -21,33 +23,17 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     /// </summary>
     public class MqttMsgPubrel : MqttMsgBase
     {
-        #region Properties...
-
-        /// <summary>
-        /// Message identifier for the acknowledged publish message
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
-        #endregion
-
-        // message identifier
-        private ushort messageId;
-        
         /// <summary>
         /// Constructor
         /// </summary>
         public MqttMsgPubrel()
         {
             this.type = MQTT_MSG_PUBREL_TYPE;
-            // PUBREL message use QoS Level 1
+            // PUBREL message use QoS Level 1 (not "officially" in 3.1.1)
             this.qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -77,10 +63,15 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)((MQTT_MSG_PUBREL_TYPE << MSG_TYPE_OFFSET) |
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_PUBREL_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_PUBREL_FLAG_BITS; // [v.3.1.1]
+            else
+            {
+                buffer[index] = (byte)((MQTT_MSG_PUBREL_TYPE << MSG_TYPE_OFFSET) |
                                    (this.qosLevel << QOS_LEVEL_OFFSET));
-            buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
-            index++;
+                buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
+                index++;
+            }
             
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
@@ -96,14 +87,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a PUBREL message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>PUBREL message instance</returns>
-        public static MqttMsgPubrel Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgPubrel Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgPubrel msg = new MqttMsgPubrel();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_PUBREL_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
@@ -111,10 +110,15 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             // read bytes from socket...
             channel.Receive(buffer);
 
-            // read QoS level from fixed header (would be QoS Level 1)
-            msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
-            // read DUP flag from fixed header
-            msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1)
+            {
+                // only 3.1.0
+
+                // read QoS level from fixed header (would be QoS Level 1)
+                msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
+                // read DUP flag from fixed header
+                msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
+            }
 
             // message id
             msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);

+ 16 - 17
M2Mqtt/Messages/MqttMsgSuback.cs

@@ -15,6 +15,7 @@ Contributors:
 */
 
 using System;
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -25,16 +26,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     {
         #region Properties...
 
-        /// <summary>
-        /// Message identifier for the subscribe message
-        /// that is acknowledged
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
         /// <summary>
         /// List of granted QOS Levels
         /// </summary>
@@ -46,8 +37,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
         #endregion
 
-        // message identifier
-        private ushort messageId;
         // granted QOS levels
         byte[] grantedQosLevels;
 
@@ -63,14 +52,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a SUBACK message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>SUBACK message instance</returns>
-        public static MqttMsgSuback Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgSuback Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgSuback msg = new MqttMsgSuback();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_SUBACK_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
@@ -93,7 +90,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -129,9 +126,11 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)(MQTT_MSG_SUBACK_TYPE << MSG_TYPE_OFFSET);
-            index++;
-
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_SUBACK_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_SUBACK_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (byte)(MQTT_MSG_SUBACK_TYPE << MSG_TYPE_OFFSET);
+            
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 

+ 32 - 25
M2Mqtt/Messages/MqttMsgSubscribe.cs

@@ -50,24 +50,13 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             set { this.qosLevels = value; }
         }
 
-        /// <summary>
-        /// Message identifier
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
         #endregion
 
         // topics to subscribe
         string[] topics;
         // QOS levels related to topics
         byte[] qosLevels;
-        // message identifier
-        ushort messageId;
-
+        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -88,7 +77,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             this.topics = topics;
             this.qosLevels = qosLevels;
 
-            // SUBSCRIBE message uses QoS Level 1
+            // SUBSCRIBE message uses QoS Level 1 (not "officially" in 3.1.1)
             this.qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
         }
 
@@ -96,9 +85,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a SUBSCRIBE message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>SUBSCRIBE message instance</returns>
-        public static MqttMsgSubscribe Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgSubscribe Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
@@ -106,6 +96,13 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             int topicUtf8Length;
             MqttMsgSubscribe msg = new MqttMsgSubscribe();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_SUBSCRIBE_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
@@ -113,12 +110,17 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             // read bytes from socket...
             int received = channel.Receive(buffer);
 
-            // read QoS level from fixed header
-            msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
-            // read DUP flag from fixed header
-            msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
-            // retain flag not used
-            msg.retain = false;
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1)
+            {
+                // only 3.1.0
+
+                // read QoS level from fixed header
+                msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
+                // read DUP flag from fixed header
+                msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
+                // retain flag not used
+                msg.retain = false;
+            }
 
             // message id
             msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
@@ -163,7 +165,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -220,11 +222,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)((MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_SUBSCRIBE_FLAG_BITS; // [v.3.1.1]
+            else
+            {
+                buffer[index] = (byte)((MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
                                    (this.qosLevel << QOS_LEVEL_OFFSET));
-            buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
-            index++;
-
+                buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
+                index++;
+            }
+            
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 

+ 16 - 22
M2Mqtt/Messages/MqttMsgUnsuback.cs

@@ -15,6 +15,7 @@ Contributors:
 */
 
 using System;
+using uPLibrary.Networking.M2Mqtt.Exceptions;
 
 namespace uPLibrary.Networking.M2Mqtt.Messages
 {
@@ -23,23 +24,6 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
     /// </summary>
     public class MqttMsgUnsuback : MqttMsgBase
     {
-        #region Properties...
-
-        /// <summary>
-        /// Message identifier for the unsubscribe message
-        /// that is acknowledged
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
-        #endregion
-
-        // message identifier
-        private ushort messageId;
-        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -52,14 +36,22 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a UNSUBACK message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>UNSUBACK message instance</returns>
-        public static MqttMsgUnsuback Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgUnsuback Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
             MqttMsgUnsuback msg = new MqttMsgUnsuback();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_UNSUBACK_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
@@ -74,7 +66,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -104,9 +96,11 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)(MQTT_MSG_UNSUBACK_TYPE << MSG_TYPE_OFFSET);
-            index++;
-
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_UNSUBACK_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_UNSUBACK_FLAG_BITS; // [v.3.1.1]
+            else
+                buffer[index++] = (byte)(MQTT_MSG_UNSUBACK_TYPE << MSG_TYPE_OFFSET);
+            
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 

+ 32 - 25
M2Mqtt/Messages/MqttMsgUnsubscribe.cs

@@ -41,22 +41,11 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             set { this.topics = value; }
         }
 
-        /// <summary>
-        /// Message identifier
-        /// </summary>
-        public ushort MessageId
-        {
-            get { return this.messageId; }
-            set { this.messageId = value; }
-        }
-
         #endregion
 
         // topics to unsubscribe
         string[] topics;
-        // message identifier
-        ushort messageId;
-
+        
         /// <summary>
         /// Constructor
         /// </summary>
@@ -75,7 +64,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
 
             this.topics = topics;
 
-            // UNSUBSCRIBE message uses QoS Level 1
+            // UNSUBSCRIBE message uses QoS Level 1 (not "officially" in 3.1.1)
             this.qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
         }
 
@@ -83,9 +72,10 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
         /// Parse bytes for a UNSUBSCRIBE message
         /// </summary>
         /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
+        /// <param name="protocolVersion">Protocol Version</param>
         /// <param name="channel">Channel connected to the broker</param>
         /// <returns>UNSUBSCRIBE message instance</returns>
-        public static MqttMsgUnsubscribe Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
+        public static MqttMsgUnsubscribe Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
         {
             byte[] buffer;
             int index = 0;
@@ -93,6 +83,13 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             int topicUtf8Length;
             MqttMsgUnsubscribe msg = new MqttMsgUnsubscribe();
 
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+            {
+                // [v3.1.1] check flag bits
+                if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_UNSUBSCRIBE_FLAG_BITS)
+                    throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
+            }
+
             // get remaining length and allocate buffer
             int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
             buffer = new byte[remainingLength];
@@ -100,12 +97,17 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             // read bytes from socket...
             int received = channel.Receive(buffer);
 
-            // read QoS level from fixed header
-            msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
-            // read DUP flag from fixed header
-            msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
-            // retain flag not used
-            msg.retain = false;
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1)
+            {
+                // only 3.1.0
+
+                // read QoS level from fixed header
+                msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
+                // read DUP flag from fixed header
+                msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
+                // retain flag not used
+                msg.retain = false;
+            }
 
             // message id
             msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
@@ -142,7 +144,7 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             return msg;
         }
 
-        public override byte[] GetBytes()
+        public override byte[] GetBytes(byte protocolVersion)
         {
             int fixedHeaderSize = 0;
             int varHeaderSize = 0;
@@ -190,11 +192,16 @@ namespace uPLibrary.Networking.M2Mqtt.Messages
             buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
 
             // first fixed header byte
-            buffer[index] = (byte)((MQTT_MSG_UNSUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
+            if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
+                buffer[index++] = (MQTT_MSG_UNSUBSCRIBE_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_UNSUBSCRIBE_FLAG_BITS; // [v.3.1.1]
+            else
+            {
+                buffer[index] = (byte)((MQTT_MSG_UNSUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
                                    (this.qosLevel << QOS_LEVEL_OFFSET));
-            buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
-            index++;
-
+                buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
+                index++;
+            }
+            
             // encode remaining length
             index = this.encodeRemainingLength(remainingLength, buffer, index);
 

File diff suppressed because it is too large
+ 301 - 146
M2Mqtt/MqttClient.cs


+ 10 - 2
M2Mqtt/MqttSettings.cs

@@ -25,14 +25,16 @@ namespace uPLibrary.Networking.M2Mqtt
         public const int MQTT_BROKER_DEFAULT_PORT = 1883;
         public const int MQTT_BROKER_DEFAULT_SSL_PORT = 8883;
         // default timeout on receiving from client
-        public const int MQTT_DEFAULT_TIMEOUT = 5000;
+        public const int MQTT_DEFAULT_TIMEOUT = 30000;
         // max publish, subscribe and unsubscribe retry for QoS Level 1 or 2
         public const int MQTT_ATTEMPTS_RETRY = 3;
         // delay for retry publish, subscribe and unsubscribe for QoS Level 1 or 2
         public const int MQTT_DELAY_RETRY = 10000;
         // broker need to receive the first message (CONNECT)
         // within a reasonable amount of time after TCP/IP connection 
-        public const int MQTT_CONNECT_TIMEOUT = 5000;
+        public const int MQTT_CONNECT_TIMEOUT = 30000;
+        // default inflight queue size
+        public const int MQTT_MAX_INFLIGHT_QUEUE_SIZE = int.MaxValue;
 
         /// <summary>
         /// Listening connection port
@@ -63,6 +65,11 @@ namespace uPLibrary.Networking.M2Mqtt
         /// Delay on retry
         /// </summary>
         public int DelayOnRetry { get; internal set; }
+
+        /// <summary>
+        /// Inflight queue size
+        /// </summary>
+        public int InflightQueueSize { get; set; }
         
         /// <summary>
         /// Singleton instance of settings
@@ -91,6 +98,7 @@ namespace uPLibrary.Networking.M2Mqtt
             this.AttemptsOnRetry = MQTT_ATTEMPTS_RETRY;
             this.DelayOnRetry = MQTT_DELAY_RETRY;
             this.TimeoutOnConnection = MQTT_CONNECT_TIMEOUT;
+            this.InflightQueueSize = MQTT_MAX_INFLIGHT_QUEUE_SIZE;
         }
     }
 }

+ 41 - 6
M2Mqtt/Net/MqttNetworkChannel.cs

@@ -229,6 +229,7 @@ namespace uPLibrary.Networking.M2Mqtt
             if (this.secure)
             {
                 this.sslStream.Write(buffer, 0, buffer.Length);
+                this.sslStream.Flush();
                 return buffer.Length;
             }
             else
@@ -249,34 +250,68 @@ namespace uPLibrary.Networking.M2Mqtt
             if (this.secure)
             {
                 // read all data needed (until fill buffer)
-                int idx = 0;
+                int idx = 0, read = 0;
                 while (idx < buffer.Length)
                 {
-                    idx += this.sslStream.Read(buffer, idx, buffer.Length - idx);
+                    // fixed scenario with socket closed gracefully by peer/broker and
+                    // Read return 0. Avoid infinite loop.
+                    read = this.sslStream.Read(buffer, idx, buffer.Length - idx);
+                    if (read == 0)
+                        return 0;
+                    idx += read;
                 }
                 return buffer.Length;
             }
             else
             {
                 // read all data needed (until fill buffer)
-                int idx = 0;
+                int idx = 0, read = 0;
                 while (idx < buffer.Length)
                 {
-                    idx += this.socket.Receive(buffer, idx, buffer.Length - idx, SocketFlags.None);
+                    // fixed scenario with socket closed gracefully by peer/broker and
+                    // Read return 0. Avoid infinite loop.
+                    read = this.socket.Receive(buffer, idx, buffer.Length - idx, SocketFlags.None);
+                    if (read == 0)
+                        return 0;
+                    idx += read;
                 }
                 return buffer.Length;
             }
 #else
             // read all data needed (until fill buffer)
-            int idx = 0;
+            int idx = 0, read = 0;
             while (idx < buffer.Length)
             {
-                idx += this.socket.Receive(buffer, idx, buffer.Length - idx, SocketFlags.None);
+                // fixed scenario with socket closed gracefully by peer/broker and
+                // Read return 0. Avoid infinite loop.
+                read = this.socket.Receive(buffer, idx, buffer.Length - idx, SocketFlags.None);
+                if (read == 0)
+                    return 0;
+                idx += read;
             }
             return buffer.Length;
 #endif
         }
 
+        /// <summary>
+        /// Receive data from the network channel with a specified timeout
+        /// </summary>
+        /// <param name="buffer">Data buffer for receiving data</param>
+        /// <param name="timeout">Timeout on receiving (in milliseconds)</param>
+        /// <returns>Number of bytes received</returns>
+        public int Receive(byte[] buffer, int timeout)
+        {
+            // check data availability (timeout is in microseconds)
+            if (this.socket.Poll(timeout * 1000, SelectMode.SelectRead))
+            {
+                return this.Receive(buffer);
+            }
+            else
+            {
+                return 0;
+            }
+        }
+
         /// <summary>
         /// Close the network channel
         /// </summary>

+ 2 - 2
M2Mqtt/Properties/AssemblyInfo.cs

@@ -37,8 +37,8 @@ using System.Runtime.InteropServices;
 //      Build Number
 //      Revision
 //
-[assembly: AssemblyVersion("3.6.0.0")]
+[assembly: AssemblyVersion("4.0.0.0")]
 // to avoid compilation error (AssemblyFileVersionAttribute doesn't exist) under .Net CF 3.5
 #if !WindowsCE
-[assembly: AssemblyFileVersion("3.6.0.0")]
+[assembly: AssemblyFileVersion("4.0.0.0")]
 #endif

+ 65 - 0
M2Mqtt/Session/MqttBrokerSession.cs

@@ -0,0 +1,65 @@
+/*
+Copyright (c) 2013, 2014 Paolo Patierno
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution. 
+
+The Eclipse Public License is available at 
+   http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at 
+   http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+   Paolo Patierno - initial API and implementation and/or initial documentation
+*/
+
+#if BROKER
+using System.Collections;
+using System.Collections.Generic;
+using uPLibrary.Networking.M2Mqtt.Managers;
+using uPLibrary.Networking.M2Mqtt.Messages;
+
+namespace uPLibrary.Networking.M2Mqtt.Session
+{
+    /// <summary>
+    /// MQTT Broker Session
+    /// </summary>
+    public class MqttBrokerSession : MqttSession
+    {
+        /// <summary>
+        /// Client related to the subscription
+        /// </summary>
+        public MqttClient Client { get; set; }
+
+        /// <summary>
+        /// Subscriptions for the client session
+        /// </summary>
+        public List<MqttSubscription> Subscriptions;
+
+        /// <summary>
+        /// Outgoing messages to publish
+        /// </summary>
+        public Queue<MqttMsgPublish> OutgoingMessages;
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        public MqttBrokerSession()
+            : base()
+        {
+            this.Client = null;
+            this.Subscriptions = new List<MqttSubscription>();
+            this.OutgoingMessages = new Queue<MqttMsgPublish>();
+        }
+
+        public override void Clear()
+        {
+            base.Clear();
+            this.Client = null;
+            this.Subscriptions.Clear();
+            this.OutgoingMessages.Clear();
+        }
+    }
+}
+#endif

+ 33 - 0
M2Mqtt/Session/MqttClientSession.cs

@@ -0,0 +1,33 @@
+/*
+Copyright (c) 2013, 2014 Paolo Patierno
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution. 
+
+The Eclipse Public License is available at 
+   http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at 
+   http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+   Paolo Patierno - initial API and implementation and/or initial documentation
+*/
+
+namespace uPLibrary.Networking.M2Mqtt.Session
+{
+    /// <summary>
+    /// MQTT Client Session
+    /// </summary>
+    public class MqttClientSession : MqttSession
+    {
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="clientId">Client Id to create session</param>
+        public MqttClientSession(string clientId)
+            : base(clientId)
+        {
+        }
+    }
+}

+ 63 - 0
M2Mqtt/Session/MqttSession.cs

@@ -0,0 +1,63 @@
+/*
+Copyright (c) 2013, 2014 Paolo Patierno
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution. 
+
+The Eclipse Public License is available at 
+   http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at 
+   http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+   Paolo Patierno - initial API and implementation and/or initial documentation
+*/
+
+using System.Collections;
+
+namespace uPLibrary.Networking.M2Mqtt.Session
+{
+    /// <summary>
+    /// MQTT Session base class
+    /// </summary>
+    public abstract class MqttSession
+    {
+        /// <summary>
+        /// Client Id
+        /// </summary>
+        public string ClientId { get; set; }
+
+        /// <summary>
+        /// Messages inflight during session
+        /// </summary>
+        public Hashtable InflightMessages { get; set; }
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        public MqttSession()
+            : this(null)
+        {
+        }
+
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="clientId">Client Id to create session</param>
+        public MqttSession(string clientId)
+        {
+            this.ClientId = clientId;
+            this.InflightMessages = new Hashtable();
+        }
+
+        /// <summary>
+        /// Clean session
+        /// </summary>
+        public virtual void Clear()
+        {
+            this.ClientId = null;
+            this.InflightMessages.Clear();
+        }
+    }
+}

+ 27 - 0
M2Mqtt/WinRT/Hashtable.cs

@@ -0,0 +1,27 @@
+/*
+Copyright (c) 2013, 2014 Paolo Patierno
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution. 
+
+The Eclipse Public License is available at 
+   http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at 
+   http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+   Paolo Patierno - initial API and implementation and/or initial documentation
+*/
+
+using System.Collections.Generic;
+
+namespace uPLibrary.Networking.M2Mqtt
+{
+    /// <summary>
+    /// Wrapper Hashtable class for generic Dictionary<TKey,TValue> (the only available in WinRT)
+    /// </summary>
+    public class Hashtable : Dictionary<object, object>
+    {
+    }
+}

+ 35 - 0
M2Mqtt/WinRT/MqttNetworkChannel.cs

@@ -23,6 +23,7 @@ using Windows.Networking;
 using Windows.Networking.Sockets;
 using System.Runtime.InteropServices.WindowsRuntime;
 using Windows.Storage.Streams;
+using System.Threading;
 
 namespace uPLibrary.Networking.M2Mqtt
 {
@@ -73,13 +74,47 @@ namespace uPLibrary.Networking.M2Mqtt
             int idx = 0;
             while (idx < buffer.Length)
             {
+                // fixed scenario with socket closed gracefully by peer/broker and
+                // Read return 0. Avoid infinite loop.
+
                 // read is executed synchronously
                 result = this.socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None).AsTask().Result;
+                if (result.Length == 0)
+                    return 0;
                 idx += (int)result.Length;
             }
             return buffer.Length;
         }
 
+        public int Receive(byte[] buffer, int timeout)
+        {
+            CancellationTokenSource cts = new CancellationTokenSource(timeout);
+
+            try
+            {
+                IBuffer result;
+
+                // read all data needed (until fill buffer)
+                int idx = 0;
+                while (idx < buffer.Length)
+                {
+                    // fixed scenario with socket closed gracefully by peer/broker and
+                    // Read return 0. Avoid infinite loop.
+
+                    // read is executed synchronously
+                    result = this.socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None).AsTask(cts.Token).Result;
+                    if (result.Length == 0)
+                        return 0;
+                    idx += (int)result.Length;
+                }
+                return buffer.Length;
+            }
+            catch (TaskCanceledException)
+            {
+                return 0;
+            }
+        }
+
         public int Send(byte[] buffer)
         {
             // send is executed synchronously

+ 5 - 2
M2Mqtt/uM2MqttNetMf42.csproj

@@ -59,12 +59,15 @@
     <Compile Include="Messages\MqttMsgUnsubscribedEventArgs.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
     <Compile Include="MqttClient.cs" />
-    <Compile Include="MqttSettings.cs" />
     <Compile Include="Net\Fx.cs" />
     <Compile Include="Net\MqttNetworkChannel.cs" />
-    <Compile Include="Utility\QueueExtension.cs" />
+    <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\Trace.cs" />
+    <Compile Include="Utility\QueueExtension.cs" />
   </ItemGroup>
   <ItemGroup>
     <Reference Include="Microsoft.SPOT.Native">

+ 5 - 2
M2Mqtt/uM2MqttNetMf43.csproj

@@ -59,12 +59,15 @@
     <Compile Include="Messages\MqttMsgUnsubscribedEventArgs.cs" />
     <Compile Include="Messages\MqttMsgUnsubscribeEventArgs.cs" />
     <Compile Include="MqttClient.cs" />
-    <Compile Include="MqttSettings.cs" />
     <Compile Include="Net\Fx.cs" />
     <Compile Include="Net\MqttNetworkChannel.cs" />
+    <Compile Include="MqttSettings.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="Utility\QueueExtension.cs" />
+    <Compile Include="Session\MqttBrokerSession.cs" />
+    <Compile Include="Session\MqttClientSession.cs" />
+    <Compile Include="Session\MqttSession.cs" />
     <Compile Include="Utility\Trace.cs" />
+    <Compile Include="Utility\QueueExtension.cs" />
   </ItemGroup>
   <ItemGroup>
     <Reference Include="Microsoft.SPOT.Native">

Some files were not shown because too many files changed in this diff