MqttMsgPublish.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Text;
  15. using uPLibrary.Networking.M2Mqtt.Exceptions;
  16. namespace uPLibrary.Networking.M2Mqtt.Messages
  17. {
  18. /// <summary>
  19. /// Class for PUBLISH message from client to broker
  20. /// </summary>
  21. public class MqttMsgPublish : MqttMsgBase
  22. {
  23. #region Properties...
  24. /// <summary>
  25. /// Message topic
  26. /// </summary>
  27. public string Topic
  28. {
  29. get { return this.topic; }
  30. set { this.topic = value; }
  31. }
  32. /// <summary>
  33. /// Message data
  34. /// </summary>
  35. public byte[] Message
  36. {
  37. get { return this.message; }
  38. set { this.message = value; }
  39. }
  40. /// <summary>
  41. /// Message identifier
  42. /// </summary>
  43. public ushort MessageId
  44. {
  45. get { return this.messageId; }
  46. set { this.messageId = value; }
  47. }
  48. #endregion
  49. // message topic
  50. private string topic;
  51. // message data
  52. private byte[] message;
  53. // message identifier
  54. ushort messageId;
  55. /// <summary>
  56. /// Constructor
  57. /// </summary>
  58. public MqttMsgPublish()
  59. {
  60. this.type = MQTT_MSG_PUBLISH_TYPE;
  61. }
  62. /// <summary>
  63. /// Constructor
  64. /// </summary>
  65. /// <param name="topic">Message topic</param>
  66. /// <param name="message">Message data</param>
  67. public MqttMsgPublish(string topic, byte[] message) :
  68. this(topic, message, false, QOS_LEVEL_AT_MOST_ONCE, false)
  69. {
  70. }
  71. /// <summary>
  72. /// Constructor
  73. /// </summary>
  74. /// <param name="topic">Message topic</param>
  75. /// <param name="message">Message data</param>
  76. /// <param name="dupFlag">Duplicate flag</param>
  77. /// <param name="qosLevel">Quality of Service level</param>
  78. /// <param name="retain">Retain flag</param>
  79. public MqttMsgPublish(string topic,
  80. byte[] message,
  81. bool dupFlag,
  82. byte qosLevel,
  83. bool retain) : base()
  84. {
  85. this.type = MQTT_MSG_PUBLISH_TYPE;
  86. this.topic = topic;
  87. this.message = message;
  88. this.dupFlag = dupFlag;
  89. this.qosLevel = qosLevel;
  90. this.retain = retain;
  91. this.messageId = 0;
  92. }
  93. public override byte[] GetBytes()
  94. {
  95. int fixedHeaderSize = 0;
  96. int varHeaderSize = 0;
  97. int payloadSize = 0;
  98. int remainingLength = 0;
  99. byte[] buffer;
  100. int index = 0;
  101. // topic can't contain wildcards
  102. if ((this.topic.IndexOf('#') != -1) || (this.topic.IndexOf('+') != -1))
  103. throw new MqttClientException(MqttClientErrorCode.TopicWildcard);
  104. // check topic length
  105. if ((this.topic.Length < MIN_TOPIC_LENGTH) || (this.topic.Length > MAX_TOPIC_LENGTH))
  106. throw new MqttClientException(MqttClientErrorCode.TopicLength);
  107. byte[] topicUtf8 = Encoding.UTF8.GetBytes(this.topic);
  108. // topic name
  109. varHeaderSize += topicUtf8.Length + 2;
  110. // message id is valid only with QOS level 1 or QOS level 2
  111. if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  112. (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  113. {
  114. varHeaderSize += MESSAGE_ID_SIZE;
  115. }
  116. // check on message with zero length
  117. if (this.message != null)
  118. // message data
  119. payloadSize += this.message.Length;
  120. remainingLength += (varHeaderSize + payloadSize);
  121. // first byte of fixed header
  122. fixedHeaderSize = 1;
  123. int temp = remainingLength;
  124. // increase fixed header size based on remaining length
  125. // (each remaining length byte can encode until 128)
  126. do
  127. {
  128. fixedHeaderSize++;
  129. temp = temp / 128;
  130. } while (temp > 0);
  131. // allocate buffer for message
  132. buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
  133. // first fixed header byte
  134. buffer[index] = (byte)((MQTT_MSG_PUBLISH_TYPE << MSG_TYPE_OFFSET) |
  135. (this.qosLevel << QOS_LEVEL_OFFSET));
  136. buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
  137. buffer[index] |= this.retain ? (byte)(1 << RETAIN_FLAG_OFFSET) : (byte)0x00;
  138. index++;
  139. // encode remaining length
  140. index = this.encodeRemainingLength(remainingLength, buffer, index);
  141. // topic name
  142. buffer[index++] = (byte)((topicUtf8.Length >> 8) & 0x00FF); // MSB
  143. buffer[index++] = (byte)(topicUtf8.Length & 0x00FF); // LSB
  144. Array.Copy(topicUtf8, 0, buffer, index, topicUtf8.Length);
  145. index += topicUtf8.Length;
  146. // message id is valid only with QOS level 1 or QOS level 2
  147. if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  148. (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  149. {
  150. // check message identifier assigned
  151. if (this.messageId == 0)
  152. throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
  153. buffer[index++] = (byte)((this.messageId >> 8) & 0x00FF); // MSB
  154. buffer[index++] = (byte)(this.messageId & 0x00FF); // LSB
  155. }
  156. // check on message with zero length
  157. if (this.message != null)
  158. {
  159. // message data
  160. Array.Copy(this.message, 0, buffer, index, this.message.Length);
  161. index += this.message.Length;
  162. }
  163. return buffer;
  164. }
  165. /// <summary>
  166. /// Parse bytes for a PUBLISH message
  167. /// </summary>
  168. /// <param name="fixedHeaderFirstByte">First fixed header byte</param>
  169. /// <param name="channel">Channel connected to the broker</param>
  170. /// <returns>PUBLISH message instance</returns>
  171. public static MqttMsgPublish Parse(byte fixedHeaderFirstByte, IMqttNetworkChannel channel)
  172. {
  173. byte[] buffer;
  174. int index = 0;
  175. byte[] topicUtf8;
  176. int topicUtf8Length;
  177. MqttMsgPublish msg = new MqttMsgPublish();
  178. // get remaining length and allocate buffer
  179. int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
  180. buffer = new byte[remainingLength];
  181. // read bytes from socket...
  182. int received = channel.Receive(buffer);
  183. // topic name
  184. topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
  185. topicUtf8Length |= buffer[index++];
  186. topicUtf8 = new byte[topicUtf8Length];
  187. Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
  188. index += topicUtf8Length;
  189. msg.topic = new String(Encoding.UTF8.GetChars(topicUtf8));
  190. // read QoS level from fixed header
  191. msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
  192. // read DUP flag from fixed header
  193. msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
  194. // read retain flag from fixed header
  195. msg.retain = (((fixedHeaderFirstByte & RETAIN_FLAG_MASK) >> RETAIN_FLAG_OFFSET) == 0x01);
  196. // message id is valid only with QOS level 1 or QOS level 2
  197. if ((msg.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
  198. (msg.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
  199. {
  200. // message id
  201. msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
  202. msg.messageId |= (buffer[index++]);
  203. }
  204. // get payload with message data
  205. int messageSize = remainingLength - index;
  206. int remaining = messageSize;
  207. int messageOffset = 0;
  208. msg.message = new byte[messageSize];
  209. // BUG FIX 26/07/2013 : receiving large payload
  210. // copy first part of payload data received
  211. Array.Copy(buffer, index, msg.message, messageOffset, received - index);
  212. remaining -= (received - index);
  213. messageOffset += (received - index);
  214. // if payload isn't finished
  215. while (remaining > 0)
  216. {
  217. // receive other payload data
  218. received = channel.Receive(buffer);
  219. Array.Copy(buffer, 0, msg.message, messageOffset, received);
  220. remaining -= received;
  221. messageOffset += received;
  222. }
  223. return msg;
  224. }
  225. public override string ToString()
  226. {
  227. #if TRACE
  228. return this.GetTraceString(
  229. "PUBLISH",
  230. new object[] { "messageId", "topic", "message" },
  231. new object[] { this.messageId, this.topic, this.message });
  232. #else
  233. return base.ToString();
  234. #endif
  235. }
  236. }
  237. }