MqttMsgSubscribe.cs 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. // if NOT .Net Micro Framework
  15. #if (!MF_FRAMEWORK_VERSION_V4_2 && !MF_FRAMEWORK_VERSION_V4_3)
  16. using System.Collections.Generic;
  17. #endif
  18. using System.Collections;
  19. using System.Text;
  20. using uPLibrary.Networking.M2Mqtt.Exceptions;
  21. namespace uPLibrary.Networking.M2Mqtt.Messages
  22. {
  23. /// <summary>
  24. /// Class for SUBSCRIBE message from client to broker
  25. /// </summary>
  26. public class MqttMsgSubscribe : MqttMsgBase
  27. {
  28. #region Properties...
  29. /// <summary>
  30. /// List of topics to subscribe
  31. /// </summary>
  32. public string[] Topics
  33. {
  34. get { return this.topics; }
  35. set { this.topics = value; }
  36. }
  37. /// <summary>
  38. /// List of QOS Levels related to topics
  39. /// </summary>
  40. public byte[] QoSLevels
  41. {
  42. get { return this.qosLevels; }
  43. set { this.qosLevels = value; }
  44. }
  45. #endregion
  46. // topics to subscribe
  47. string[] topics;
  48. // QOS levels related to topics
  49. byte[] qosLevels;
  50. /// <summary>
  51. /// Constructor
  52. /// </summary>
  53. public MqttMsgSubscribe()
  54. {
  55. this.type = MQTT_MSG_SUBSCRIBE_TYPE;
  56. }
  57. /// <summary>
  58. /// Constructor
  59. /// </summary>
  60. /// <param name="topics">List of topics to subscribe</param>
  61. /// <param name="qosLevels">List of QOS Levels related to topics</param>
  62. public MqttMsgSubscribe(string[] topics, byte[] qosLevels)
  63. {
  64. this.type = MQTT_MSG_SUBSCRIBE_TYPE;
  65. this.topics = topics;
  66. this.qosLevels = qosLevels;
  67. // SUBSCRIBE message uses QoS Level 1 (not "officially" in 3.1.1)
  68. this.qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
  69. }
  70. /// <summary>
  71. /// Parse bytes for a SUBSCRIBE message
  72. /// </summary>
  73. /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
  74. /// <param name="protocolVersion">Protocol Version</param>
  75. /// <param name="channel">Channel connected to the broker</param>
  76. /// <returns>SUBSCRIBE message instance</returns>
  77. public static MqttMsgSubscribe Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
  78. {
  79. byte[] buffer;
  80. int index = 0;
  81. byte[] topicUtf8;
  82. int topicUtf8Length;
  83. MqttMsgSubscribe msg = new MqttMsgSubscribe();
  84. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
  85. {
  86. // [v3.1.1] check flag bits
  87. if ((fixedHeaderFirstByte & MSG_FLAG_BITS_MASK) != MQTT_MSG_SUBSCRIBE_FLAG_BITS)
  88. throw new MqttClientException(MqttClientErrorCode.InvalidFlagBits);
  89. }
  90. // get remaining length and allocate buffer
  91. int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
  92. buffer = new byte[remainingLength];
  93. // read bytes from socket...
  94. int received = channel.Receive(buffer);
  95. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1)
  96. {
  97. // only 3.1.0
  98. // read QoS level from fixed header
  99. msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
  100. // read DUP flag from fixed header
  101. msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
  102. // retain flag not used
  103. msg.retain = false;
  104. }
  105. // message id
  106. msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
  107. msg.messageId |= (buffer[index++]);
  108. // payload contains topics and QoS levels
  109. // NOTE : before, I don't know how many topics will be in the payload (so use List)
  110. // if .Net Micro Framework
  111. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  112. IList tmpTopics = new ArrayList();
  113. IList tmpQosLevels = new ArrayList();
  114. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  115. #else
  116. IList<String> tmpTopics = new List<String>();
  117. IList<byte> tmpQosLevels = new List<byte>();
  118. #endif
  119. do
  120. {
  121. // topic name
  122. topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
  123. topicUtf8Length |= buffer[index++];
  124. topicUtf8 = new byte[topicUtf8Length];
  125. Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
  126. index += topicUtf8Length;
  127. tmpTopics.Add(new String(Encoding.UTF8.GetChars(topicUtf8)));
  128. // QoS level
  129. tmpQosLevels.Add(buffer[index++]);
  130. } while (index < remainingLength);
  131. // copy from list to array
  132. msg.topics = new string[tmpTopics.Count];
  133. msg.qosLevels = new byte[tmpQosLevels.Count];
  134. for (int i = 0; i < tmpTopics.Count; i++)
  135. {
  136. msg.topics[i] = (string)tmpTopics[i];
  137. msg.qosLevels[i] = (byte)tmpQosLevels[i];
  138. }
  139. return msg;
  140. }
  141. public override byte[] GetBytes(byte protocolVersion)
  142. {
  143. int fixedHeaderSize = 0;
  144. int varHeaderSize = 0;
  145. int payloadSize = 0;
  146. int remainingLength = 0;
  147. byte[] buffer;
  148. int index = 0;
  149. // topics list empty
  150. if ((this.topics == null) || (this.topics.Length == 0))
  151. throw new MqttClientException(MqttClientErrorCode.TopicsEmpty);
  152. // qos levels list empty
  153. if ((this.qosLevels == null) || (this.qosLevels.Length == 0))
  154. throw new MqttClientException(MqttClientErrorCode.QosLevelsEmpty);
  155. // topics and qos levels lists length don't match
  156. if (this.topics.Length != this.qosLevels.Length)
  157. throw new MqttClientException(MqttClientErrorCode.TopicsQosLevelsNotMatch);
  158. // message identifier
  159. varHeaderSize += MESSAGE_ID_SIZE;
  160. int topicIdx = 0;
  161. byte[][] topicsUtf8 = new byte[this.topics.Length][];
  162. for (topicIdx = 0; topicIdx < this.topics.Length; topicIdx++)
  163. {
  164. // check topic length
  165. if ((this.topics[topicIdx].Length < MIN_TOPIC_LENGTH) || (this.topics[topicIdx].Length > MAX_TOPIC_LENGTH))
  166. throw new MqttClientException(MqttClientErrorCode.TopicLength);
  167. topicsUtf8[topicIdx] = Encoding.UTF8.GetBytes(this.topics[topicIdx]);
  168. payloadSize += 2; // topic size (MSB, LSB)
  169. payloadSize += topicsUtf8[topicIdx].Length;
  170. payloadSize++; // byte for QoS
  171. }
  172. remainingLength += (varHeaderSize + payloadSize);
  173. // first byte of fixed header
  174. fixedHeaderSize = 1;
  175. int temp = remainingLength;
  176. // increase fixed header size based on remaining length
  177. // (each remaining length byte can encode until 128)
  178. do
  179. {
  180. fixedHeaderSize++;
  181. temp = temp / 128;
  182. } while (temp > 0);
  183. // allocate buffer for message
  184. buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
  185. // first fixed header byte
  186. if (protocolVersion == MqttMsgConnect.PROTOCOL_VERSION_V3_1_1)
  187. buffer[index++] = (MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) | MQTT_MSG_SUBSCRIBE_FLAG_BITS; // [v.3.1.1]
  188. else
  189. {
  190. buffer[index] = (byte)((MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
  191. (this.qosLevel << QOS_LEVEL_OFFSET));
  192. buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
  193. index++;
  194. }
  195. // encode remaining length
  196. index = this.encodeRemainingLength(remainingLength, buffer, index);
  197. // check message identifier assigned (SUBSCRIBE uses QoS Level 1, so message id is mandatory)
  198. if (this.messageId == 0)
  199. throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
  200. buffer[index++] = (byte)((messageId >> 8) & 0x00FF); // MSB
  201. buffer[index++] = (byte)(messageId & 0x00FF); // LSB
  202. topicIdx = 0;
  203. for (topicIdx = 0; topicIdx < this.topics.Length; topicIdx++)
  204. {
  205. // topic name
  206. buffer[index++] = (byte)((topicsUtf8[topicIdx].Length >> 8) & 0x00FF); // MSB
  207. buffer[index++] = (byte)(topicsUtf8[topicIdx].Length & 0x00FF); // LSB
  208. Array.Copy(topicsUtf8[topicIdx], 0, buffer, index, topicsUtf8[topicIdx].Length);
  209. index += topicsUtf8[topicIdx].Length;
  210. // requested QoS
  211. buffer[index++] = this.qosLevels[topicIdx];
  212. }
  213. return buffer;
  214. }
  215. public override string ToString()
  216. {
  217. #if TRACE
  218. return this.GetTraceString(
  219. "SUBSCRIBE",
  220. new object[] { "messageId", "topics", "qosLevels" },
  221. new object[] { this.messageId, this.topics, this.qosLevels });
  222. #else
  223. return base.ToString();
  224. #endif
  225. }
  226. }
  227. }