MqttMsgSubscribe.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. // if NOT .Net Micro Framework
  15. #if (!MF_FRAMEWORK_VERSION_V4_2 && !MF_FRAMEWORK_VERSION_V4_3)
  16. using System.Collections.Generic;
  17. #endif
  18. using System.Collections;
  19. using System.Text;
  20. using uPLibrary.Networking.M2Mqtt.Exceptions;
  21. namespace uPLibrary.Networking.M2Mqtt.Messages
  22. {
  23. /// <summary>
  24. /// Class for SUBSCRIBE message from client to broker
  25. /// </summary>
  26. public class MqttMsgSubscribe : MqttMsgBase
  27. {
  28. #region Properties...
  29. /// <summary>
  30. /// List of topics to subscribe
  31. /// </summary>
  32. public string[] Topics
  33. {
  34. get { return this.topics; }
  35. set { this.topics = value; }
  36. }
  37. /// <summary>
  38. /// List of QOS Levels related to topics
  39. /// </summary>
  40. public byte[] QoSLevels
  41. {
  42. get { return this.qosLevels; }
  43. set { this.qosLevels = value; }
  44. }
  45. /// <summary>
  46. /// Message identifier
  47. /// </summary>
  48. public ushort MessageId
  49. {
  50. get { return this.messageId; }
  51. set { this.messageId = value; }
  52. }
  53. #endregion
  54. // topics to subscribe
  55. string[] topics;
  56. // QOS levels related to topics
  57. byte[] qosLevels;
  58. // message identifier
  59. ushort messageId;
  60. /// <summary>
  61. /// Constructor
  62. /// </summary>
  63. public MqttMsgSubscribe()
  64. {
  65. this.type = MQTT_MSG_SUBSCRIBE_TYPE;
  66. }
  67. /// <summary>
  68. /// Constructor
  69. /// </summary>
  70. /// <param name="topics">List of topics to subscribe</param>
  71. /// <param name="qosLevels">List of QOS Levels related to topics</param>
  72. public MqttMsgSubscribe(string[] topics, byte[] qosLevels)
  73. {
  74. this.type = MQTT_MSG_SUBSCRIBE_TYPE;
  75. this.topics = topics;
  76. this.qosLevels = qosLevels;
  77. // SUBSCRIBE message uses QoS Level 1
  78. this.qosLevel = QOS_LEVEL_AT_LEAST_ONCE;
  79. }
  80. /// <summary>
  81. /// Parse bytes for a SUBSCRIBE message
  82. /// </summary>
  83. /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
  84. /// <param name="channel">Channel connected to the broker</param>
  85. /// <returns>SUBSCRIBE message instance</returns>
  86. public static MqttMsgSubscribe Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
  87. {
  88. byte[] buffer;
  89. int index = 0;
  90. byte[] topicUtf8;
  91. int topicUtf8Length;
  92. MqttMsgSubscribe msg = new MqttMsgSubscribe();
  93. // get remaining length and allocate buffer
  94. int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
  95. buffer = new byte[remainingLength];
  96. // read bytes from socket...
  97. int received = channel.Receive(buffer);
  98. // read QoS level from fixed header
  99. msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
  100. // read DUP flag from fixed header
  101. msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
  102. // retain flag not used
  103. msg.retain = false;
  104. // message id
  105. msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
  106. msg.messageId |= (buffer[index++]);
  107. // payload contains topics and QoS levels
  108. // NOTE : before, I don't know how many topics will be in the payload (so use List)
  109. // if .Net Micro Framework
  110. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  111. IList tmpTopics = new ArrayList();
  112. IList tmpQosLevels = new ArrayList();
  113. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  114. #else
  115. IList<String> tmpTopics = new List<String>();
  116. IList<byte> tmpQosLevels = new List<byte>();
  117. #endif
  118. do
  119. {
  120. // topic name
  121. topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
  122. topicUtf8Length |= buffer[index++];
  123. topicUtf8 = new byte[topicUtf8Length];
  124. Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
  125. index += topicUtf8Length;
  126. tmpTopics.Add(new String(Encoding.UTF8.GetChars(topicUtf8)));
  127. // QoS level
  128. tmpQosLevels.Add(buffer[index++]);
  129. } while (index < remainingLength);
  130. // copy from list to array
  131. msg.topics = new string[tmpTopics.Count];
  132. msg.qosLevels = new byte[tmpQosLevels.Count];
  133. for (int i = 0; i < tmpTopics.Count; i++)
  134. {
  135. msg.topics[i] = (string)tmpTopics[i];
  136. msg.qosLevels[i] = (byte)tmpQosLevels[i];
  137. }
  138. return msg;
  139. }
  140. public override byte[] GetBytes()
  141. {
  142. int fixedHeaderSize = 0;
  143. int varHeaderSize = 0;
  144. int payloadSize = 0;
  145. int remainingLength = 0;
  146. byte[] buffer;
  147. int index = 0;
  148. // topics list empty
  149. if ((this.topics == null) || (this.topics.Length == 0))
  150. throw new MqttClientException(MqttClientErrorCode.TopicsEmpty);
  151. // qos levels list empty
  152. if ((this.qosLevels == null) || (this.qosLevels.Length == 0))
  153. throw new MqttClientException(MqttClientErrorCode.QosLevelsEmpty);
  154. // topics and qos levels lists length don't match
  155. if (this.topics.Length != this.qosLevels.Length)
  156. throw new MqttClientException(MqttClientErrorCode.TopicsQosLevelsNotMatch);
  157. // message identifier
  158. varHeaderSize += MESSAGE_ID_SIZE;
  159. int topicIdx = 0;
  160. byte[][] topicsUtf8 = new byte[this.topics.Length][];
  161. for (topicIdx = 0; topicIdx < this.topics.Length; topicIdx++)
  162. {
  163. // check topic length
  164. if ((this.topics[topicIdx].Length < MIN_TOPIC_LENGTH) || (this.topics[topicIdx].Length > MAX_TOPIC_LENGTH))
  165. throw new MqttClientException(MqttClientErrorCode.TopicLength);
  166. topicsUtf8[topicIdx] = Encoding.UTF8.GetBytes(this.topics[topicIdx]);
  167. payloadSize += 2; // topic size (MSB, LSB)
  168. payloadSize += topicsUtf8[topicIdx].Length;
  169. payloadSize++; // byte for QoS
  170. }
  171. remainingLength += (varHeaderSize + payloadSize);
  172. // first byte of fixed header
  173. fixedHeaderSize = 1;
  174. int temp = remainingLength;
  175. // increase fixed header size based on remaining length
  176. // (each remaining length byte can encode until 128)
  177. do
  178. {
  179. fixedHeaderSize++;
  180. temp = temp / 128;
  181. } while (temp > 0);
  182. // allocate buffer for message
  183. buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
  184. // first fixed header byte
  185. buffer[index] = (byte)((MQTT_MSG_SUBSCRIBE_TYPE << MSG_TYPE_OFFSET) |
  186. (this.qosLevel << QOS_LEVEL_OFFSET));
  187. buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
  188. index++;
  189. // encode remaining length
  190. index = this.encodeRemainingLength(remainingLength, buffer, index);
  191. // check message identifier assigned (SUBSCRIBE uses QoS Level 1, so message id is mandatory)
  192. if (this.messageId == 0)
  193. throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
  194. buffer[index++] = (byte)((messageId >> 8) & 0x00FF); // MSB
  195. buffer[index++] = (byte)(messageId & 0x00FF); // LSB
  196. topicIdx = 0;
  197. for (topicIdx = 0; topicIdx < this.topics.Length; topicIdx++)
  198. {
  199. // topic name
  200. buffer[index++] = (byte)((topicsUtf8[topicIdx].Length >> 8) & 0x00FF); // MSB
  201. buffer[index++] = (byte)(topicsUtf8[topicIdx].Length & 0x00FF); // LSB
  202. Array.Copy(topicsUtf8[topicIdx], 0, buffer, index, topicsUtf8[topicIdx].Length);
  203. index += topicsUtf8[topicIdx].Length;
  204. // requested QoS
  205. buffer[index++] = this.qosLevels[topicIdx];
  206. }
  207. return buffer;
  208. }
  209. public override string ToString()
  210. {
  211. #if TRACE
  212. return this.GetTraceString(
  213. "SUBSCRIBE",
  214. new object[] { "messageId", "topics", "qosLevels" },
  215. new object[] { this.messageId, this.topics, this.qosLevels });
  216. #else
  217. return base.ToString();
  218. #endif
  219. }
  220. }
  221. }