MqttMsgPublish.cs 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Text;
  15. using uPLibrary.Networking.M2Mqtt.Exceptions;
  16. namespace uPLibrary.Networking.M2Mqtt.Messages
  17. {
  18. /// <summary>
  19. /// Class for PUBLISH message from client to broker
  20. /// </summary>
  21. public class MqttMsgPublish : MqttMsgBase
  22. {
  23. #region Properties...
  24. /// <summary>
  25. /// Message topic
  26. /// </summary>
  27. public string Topic
  28. {
  29. get { return this.topic; }
  30. set { this.topic = value; }
  31. }
  32. /// <summary>
  33. /// Message data
  34. /// </summary>
  35. public byte[] Message
  36. {
  37. get { return this.message; }
  38. set { this.message = value; }
  39. }
  40. #endregion
  41. // message topic
  42. private string topic;
  43. // message data
  44. private byte[] message;
  45. /// <summary>
  46. /// Constructor
  47. /// </summary>
  48. public MqttMsgPublish()
  49. {
  50. this.type = MQTT_MSG_PUBLISH_TYPE;
  51. }
  52. /// <summary>
  53. /// Constructor
  54. /// </summary>
  55. /// <param name="topic">Message topic</param>
  56. /// <param name="message">Message data</param>
  57. public MqttMsgPublish(string topic, byte[] message) :
  58. this(topic, message, false, QOS_LEVEL_AT_MOST_ONCE, false)
  59. {
  60. }
  61. /// <summary>
  62. /// Constructor
  63. /// </summary>
  64. /// <param name="topic">Message topic</param>
  65. /// <param name="message">Message data</param>
  66. /// <param name="dupFlag">Duplicate flag</param>
  67. /// <param name="qosLevel">Quality of Service level</param>
  68. /// <param name="retain">Retain flag</param>
  69. public MqttMsgPublish(string topic,
  70. byte[] message,
  71. bool dupFlag,
  72. byte qosLevel,
  73. bool retain) : base()
  74. {
  75. this.type = MQTT_MSG_PUBLISH_TYPE;
  76. this.topic = topic;
  77. this.message = message;
  78. this.dupFlag = dupFlag;
  79. this.qosLevel = qosLevel;
  80. this.retain = retain;
  81. this.messageId = 0;
  82. }
  83. public override byte[] GetBytes(byte protocolVersion)
  84. {
  85. int fixedHeaderSize = 0;
  86. int varHeaderSize = 0;
  87. int payloadSize = 0;
  88. int remainingLength = 0;
  89. byte[] buffer;
  90. int index = 0;
  91. // topic can't contain wildcards
  92. if ((this.topic.IndexOf('#') != -1) || (this.topic.IndexOf('+') != -1))
  93. throw new MqttClientException(MqttClientErrorCode.TopicWildcard);
  94. // check topic length
  95. if ((this.topic.Length < MIN_TOPIC_LENGTH) || (this.topic.Length > MAX_TOPIC_LENGTH))
  96. throw new MqttClientException(MqttClientErrorCode.TopicLength);
  97. // check wrong QoS level (both bits can't be set 1)
  98. if (this.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
  99. throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
  100. byte[] topicUtf8 = Encoding.UTF8.GetBytes(this.topic);
  101. // topic name
  102. varHeaderSize += topicUtf8.Length + 2;
  103. // message id is valid only with QOS level 1 or QOS level 2
  104. if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  105. (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  106. {
  107. varHeaderSize += MESSAGE_ID_SIZE;
  108. }
  109. // check on message with zero length
  110. if (this.message != null)
  111. // message data
  112. payloadSize += this.message.Length;
  113. remainingLength += (varHeaderSize + payloadSize);
  114. // first byte of fixed header
  115. fixedHeaderSize = 1;
  116. int temp = remainingLength;
  117. // increase fixed header size based on remaining length
  118. // (each remaining length byte can encode until 128)
  119. do
  120. {
  121. fixedHeaderSize++;
  122. temp = temp / 128;
  123. } while (temp > 0);
  124. // allocate buffer for message
  125. buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
  126. // first fixed header byte
  127. buffer[index] = (byte)((MQTT_MSG_PUBLISH_TYPE << MSG_TYPE_OFFSET) |
  128. (this.qosLevel << QOS_LEVEL_OFFSET));
  129. buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
  130. buffer[index] |= this.retain ? (byte)(1 << RETAIN_FLAG_OFFSET) : (byte)0x00;
  131. index++;
  132. // encode remaining length
  133. index = this.encodeRemainingLength(remainingLength, buffer, index);
  134. // topic name
  135. buffer[index++] = (byte)((topicUtf8.Length >> 8) & 0x00FF); // MSB
  136. buffer[index++] = (byte)(topicUtf8.Length & 0x00FF); // LSB
  137. Array.Copy(topicUtf8, 0, buffer, index, topicUtf8.Length);
  138. index += topicUtf8.Length;
  139. // message id is valid only with QOS level 1 or QOS level 2
  140. if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  141. (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  142. {
  143. // check message identifier assigned
  144. if (this.messageId == 0)
  145. throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
  146. buffer[index++] = (byte)((this.messageId >> 8) & 0x00FF); // MSB
  147. buffer[index++] = (byte)(this.messageId & 0x00FF); // LSB
  148. }
  149. // check on message with zero length
  150. if (this.message != null)
  151. {
  152. // message data
  153. Array.Copy(this.message, 0, buffer, index, this.message.Length);
  154. index += this.message.Length;
  155. }
  156. return buffer;
  157. }
  158. /// <summary>
  159. /// Parse bytes for a PUBLISH message
  160. /// </summary>
  161. /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
  162. /// <param name="protocolVersion">Protocol Version</param>
  163. /// <param name="channel">Channel connected to the broker</param>
  164. /// <returns>PUBLISH message instance</returns>
  165. public static MqttMsgPublish Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
  166. {
  167. byte[] buffer;
  168. int index = 0;
  169. byte[] topicUtf8;
  170. int topicUtf8Length;
  171. MqttMsgPublish msg = new MqttMsgPublish();
  172. // get remaining length and allocate buffer
  173. int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
  174. buffer = new byte[remainingLength];
  175. // read bytes from socket...
  176. int received = channel.Receive(buffer);
  177. // topic name
  178. topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
  179. topicUtf8Length |= buffer[index++];
  180. topicUtf8 = new byte[topicUtf8Length];
  181. Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
  182. index += topicUtf8Length;
  183. msg.topic = new String(Encoding.UTF8.GetChars(topicUtf8));
  184. // read QoS level from fixed header
  185. msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
  186. // check wrong QoS level (both bits can't be set 1)
  187. if (msg.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
  188. throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
  189. // read DUP flag from fixed header
  190. msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
  191. // read retain flag from fixed header
  192. msg.retain = (((fixedHeaderFirstByte & RETAIN_FLAG_MASK) >> RETAIN_FLAG_OFFSET) == 0x01);
  193. // message id is valid only with QOS level 1 or QOS level 2
  194. if ((msg.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  195. (msg.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  196. {
  197. // message id
  198. msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
  199. msg.messageId |= (buffer[index++]);
  200. }
  201. // get payload with message data
  202. int messageSize = remainingLength - index;
  203. int remaining = messageSize;
  204. int messageOffset = 0;
  205. msg.message = new byte[messageSize];
  206. // BUG FIX 26/07/2013 : receiving large payload
  207. // copy first part of payload data received
  208. Array.Copy(buffer, index, msg.message, messageOffset, received - index);
  209. remaining -= (received - index);
  210. messageOffset += (received - index);
  211. // if payload isn't finished
  212. while (remaining > 0)
  213. {
  214. // receive other payload data
  215. received = channel.Receive(buffer);
  216. Array.Copy(buffer, 0, msg.message, messageOffset, received);
  217. remaining -= received;
  218. messageOffset += received;
  219. }
  220. return msg;
  221. }
  222. public override string ToString()
  223. {
  224. #if TRACE
  225. return this.GetTraceString(
  226. "PUBLISH",
  227. new object[] { "messageId", "topic", "message" },
  228. new object[] { this.messageId, this.topic, this.message });
  229. #else
  230. return base.ToString();
  231. #endif
  232. }
  233. }
  234. }