MqttClient.cs 92 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Net;
  15. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  16. using System.Net.Sockets;
  17. using System.Security.Cryptography.X509Certificates;
  18. #endif
  19. using System.Threading;
  20. using uPLibrary.Networking.M2Mqtt.Exceptions;
  21. using uPLibrary.Networking.M2Mqtt.Messages;
  22. using uPLibrary.Networking.M2Mqtt.Utility;
  23. // if .Net Micro Framework
  24. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  25. using Microsoft.SPOT;
  26. #if SSL
  27. using Microsoft.SPOT.Net.Security;
  28. #endif
  29. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  30. #else
  31. using System.Collections.Generic;
  32. #if (SSL && !(WINDOWS_APP || WINDOWS_PHONE_APP))
  33. using System.Security.Authentication;
  34. using System.Net.Security;
  35. #endif
  36. #endif
  37. #if (WINDOWS_APP || WINDOWS_PHONE_APP)
  38. using Windows.Networking.Sockets;
  39. #endif
  40. using System.Collections;
  41. // alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
  42. // (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
  43. using MqttUtility = uPLibrary.Networking.M2Mqtt.Utility;
  44. namespace uPLibrary.Networking.M2Mqtt
  45. {
  46. /// <summary>
  47. /// MQTT Client
  48. /// </summary>
  49. public class MqttClient
  50. {
  51. #if BROKER
  52. #region Constants ...
  53. // thread names
  54. private const string RECEIVE_THREAD_NAME = "ReceiveThread";
  55. private const string RECEIVE_EVENT_THREAD_NAME = "ReceiveEventThread";
  56. private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread";
  57. private const string KEEP_ALIVE_THREAD = "KeepAliveThread";
  58. #endregion
  59. #endif
  60. /// <summary>
  61. /// Delagate that defines event handler for PUBLISH message received
  62. /// </summary>
  63. public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e);
  64. /// <summary>
  65. /// Delegate that defines event handler for published message
  66. /// </summary>
  67. public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e);
  68. /// <summary>
  69. /// Delagate that defines event handler for subscribed topic
  70. /// </summary>
  71. public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e);
  72. /// <summary>
  73. /// Delagate that defines event handler for unsubscribed topic
  74. /// </summary>
  75. public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e);
  76. #if BROKER
  77. /// <summary>
  78. /// Delagate that defines event handler for SUBSCRIBE message received
  79. /// </summary>
  80. public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e);
  81. /// <summary>
  82. /// Delagate that defines event handler for UNSUBSCRIBE message received
  83. /// </summary>
  84. public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e);
  85. /// <summary>
  86. /// Delagate that defines event handler for CONNECT message received
  87. /// </summary>
  88. public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e);
  89. #endif
  90. /// <summary>
  91. /// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
  92. /// </summary>
  93. public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e);
  94. // broker hostname (or ip address) and port
  95. private string brokerHostName;
  96. private int brokerPort;
  97. // running status of threads
  98. private bool isRunning;
  99. // event for raising received message event
  100. private AutoResetEvent receiveEventWaitHandle;
  101. // event for starting process inflight queue asynchronously
  102. private AutoResetEvent inflightWaitHandle;
  103. // event for signaling synchronous receive
  104. AutoResetEvent syncEndReceiving;
  105. // message received
  106. MqttMsgBase msgReceived;
  107. // exeption thrown during receiving
  108. Exception exReceiving;
  109. // keep alive period (in ms)
  110. private int keepAlivePeriod;
  111. // events for signaling on keep alive thread
  112. private AutoResetEvent keepAliveEvent;
  113. private AutoResetEvent keepAliveEventEnd;
  114. // keep alive timeout expired
  115. private bool isKeepAliveTimeout;
  116. // last communication time in ticks
  117. private long lastCommTime;
  118. // event for PUBLISH message received
  119. public event MqttMsgPublishEventHandler MqttMsgPublishReceived;
  120. // event for published message
  121. public event MqttMsgPublishedEventHandler MqttMsgPublished;
  122. // event for subscribed topic
  123. public event MqttMsgSubscribedEventHandler MqttMsgSubscribed;
  124. // event for unsubscribed topic
  125. public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed;
  126. #if BROKER
  127. // event for SUBSCRIBE message received
  128. public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived;
  129. // event for USUBSCRIBE message received
  130. public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived;
  131. // event for CONNECT message received
  132. public event MqttMsgConnectEventHandler MqttMsgConnected;
  133. #endif
  134. // event for client disconnection (DISCONNECT message or not)
  135. public event MqttMsgDisconnectEventHandler MqttMsgDisconnected;
  136. // channel to communicate over the network
  137. private IMqttNetworkChannel channel;
  138. // inflight messages queue
  139. private Queue inflightQueue;
  140. // internal queue for received messages about inflight messages
  141. private Queue internalQueue;
  142. // receive queue for received messages
  143. private Queue receiveQueue;
  144. // reference to avoid access to singleton via property
  145. private MqttSettings settings;
  146. // current message identifier generated
  147. private ushort messageIdCounter = 0;
  148. /// <summary>
  149. /// Connection status between client and broker
  150. /// </summary>
  151. public bool IsConnected { get; private set; }
  152. /// <summary>
  153. /// Client identifier
  154. /// </summary>
  155. public string ClientId { get; private set; }
  156. /// <summary>
  157. /// Clean session flag
  158. /// </summary>
  159. public bool CleanSession { get; private set; }
  160. /// <summary>
  161. /// Will flag
  162. /// </summary>
  163. public bool WillFlag { get; private set; }
  164. /// <summary>
  165. /// Will QOS level
  166. /// </summary>
  167. public byte WillQosLevel { get; private set; }
  168. /// <summary>
  169. /// Will topic
  170. /// </summary>
  171. public string WillTopic { get; private set; }
  172. /// <summary>
  173. /// Will message
  174. /// </summary>
  175. public string WillMessage { get; private set; }
  176. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  177. /// <summary>
  178. /// Constructor
  179. /// </summary>
  180. /// <param name="brokerIpAddress">Broker IP address</param>
  181. [Obsolete("Use this ctor MqttClient(string brokerHostName) insted")]
  182. public MqttClient(IPAddress brokerIpAddress) :
  183. this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null)
  184. {
  185. }
  186. /// <summary>
  187. /// Constructor
  188. /// </summary>
  189. /// <param name="brokerIpAddress">Broker IP address</param>
  190. /// <param name="brokerPort">Broker port</param>
  191. /// <param name="secure">Using secure connection</param>
  192. /// <param name="caCert">CA certificate for secure connection</param>
  193. [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")]
  194. public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert)
  195. {
  196. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  197. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, null, null);
  198. #else
  199. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert);
  200. #endif
  201. }
  202. #endif
  203. /// <summary>
  204. /// Constructor
  205. /// </summary>
  206. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  207. public MqttClient(string brokerHostName) :
  208. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  209. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null)
  210. #else
  211. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false)
  212. #endif
  213. {
  214. }
  215. /// <summary>
  216. /// Constructor
  217. /// </summary>
  218. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  219. /// <param name="brokerPort">Broker port</param>
  220. /// <param name="secure">Using secure connection</param>
  221. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  222. /// <param name="caCert">CA certificate for secure connection</param>
  223. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert)
  224. #else
  225. public MqttClient(string brokerHostName, int brokerPort, bool secure)
  226. #endif
  227. {
  228. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  229. this.Init(brokerHostName, brokerPort, secure, caCert, null, null);
  230. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  231. this.Init(brokerHostName, brokerPort, secure);
  232. #else
  233. this.Init(brokerHostName, brokerPort, secure, caCert);
  234. #endif
  235. }
  236. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  237. /// <summary>
  238. /// Constructor
  239. /// </summary>
  240. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  241. /// <param name="brokerPort">Broker port</param>
  242. /// <param name="secure">Using secure connection</param>
  243. /// <param name="caCert">CA certificate for secure connection</param>
  244. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  245. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert,
  246. RemoteCertificateValidationCallback userCertificateValidationCallback)
  247. : this(brokerHostName, brokerPort, secure, caCert, userCertificateValidationCallback, null)
  248. {
  249. }
  250. /// <summary>
  251. /// Constructor
  252. /// </summary>
  253. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  254. /// <param name="brokerPort">Broker port</param>
  255. /// <param name="secure">Using secure connection</param>
  256. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  257. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  258. public MqttClient(string brokerHostName, int brokerPort, bool secure,
  259. RemoteCertificateValidationCallback userCertificateValidationCallback,
  260. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  261. : this(brokerHostName, brokerPort, secure, null, userCertificateValidationCallback, userCertificateSelectionCallback)
  262. {
  263. }
  264. /// <summary>
  265. /// Constructor
  266. /// </summary>
  267. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  268. /// <param name="brokerPort">Broker port</param>
  269. /// <param name="secure">Using secure connection</param>
  270. /// <param name="caCert">CA certificate for secure connection</param>
  271. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  272. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  273. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert,
  274. RemoteCertificateValidationCallback userCertificateValidationCallback,
  275. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  276. {
  277. this.Init(brokerHostName, brokerPort, secure, caCert, userCertificateValidationCallback, userCertificateSelectionCallback);
  278. }
  279. #endif
  280. #if BROKER
  281. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  282. /// <summary>
  283. /// Constructor
  284. /// </summary>
  285. /// <param name="socket">Raw socket for communication</param>
  286. public MqttClient(Socket socket)
  287. #else
  288. public MqttClient(StreamSocket socket)
  289. #endif
  290. {
  291. this.channel = new MqttNetworkChannel(socket);
  292. // reference to MQTT settings
  293. this.settings = MqttSettings.Instance;
  294. // client not connected yet (CONNACK not send from client), some default values
  295. this.IsConnected = false;
  296. this.ClientId = null;
  297. this.CleanSession = true;
  298. this.keepAliveEvent = new AutoResetEvent(false);
  299. // queue for handling inflight messages (publishing and acknowledge)
  300. this.inflightWaitHandle = new AutoResetEvent(false);
  301. this.inflightQueue = new Queue();
  302. // queue for received message
  303. this.receiveEventWaitHandle = new AutoResetEvent(false);
  304. this.receiveQueue = new Queue();
  305. this.internalQueue = new Queue();
  306. }
  307. #endif
  308. /// <summary>
  309. /// MqttClient initialization
  310. /// </summary>
  311. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  312. /// <param name="brokerPort">Broker port</param>
  313. /// <param name="secure">>Using secure connection</param>
  314. /// <param name="caCert">CA certificate for secure connection</param>
  315. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  316. /// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  317. /// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  318. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert,
  319. RemoteCertificateValidationCallback userCertificateValidationCallback,
  320. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  321. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  322. private void Init(string brokerHostName, int brokerPort, bool secure)
  323. #else
  324. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert)
  325. #endif
  326. {
  327. #if !SSL
  328. // check security parameters
  329. if (secure)
  330. throw new ArgumentException("Library compiled without SSL support");
  331. #endif
  332. this.brokerHostName = brokerHostName;
  333. this.brokerPort = brokerPort;
  334. // reference to MQTT settings
  335. this.settings = MqttSettings.Instance;
  336. this.syncEndReceiving = new AutoResetEvent(false);
  337. this.keepAliveEvent = new AutoResetEvent(false);
  338. // queue for handling inflight messages (publishing and acknowledge)
  339. this.inflightWaitHandle = new AutoResetEvent(false);
  340. this.inflightQueue = new Queue();
  341. // queue for received message
  342. this.receiveEventWaitHandle = new AutoResetEvent(false);
  343. this.receiveQueue = new Queue();
  344. this.internalQueue = new Queue();
  345. // create network channel
  346. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  347. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, userCertificateValidationCallback, userCertificateSelectionCallback);
  348. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  349. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure);
  350. #else
  351. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert);
  352. #endif
  353. }
  354. /// <summary>
  355. /// Connect to broker
  356. /// </summary>
  357. /// <param name="clientId">Client identifier</param>
  358. /// <returns>Return code of CONNACK message from broker</returns>
  359. public byte Connect(string clientId)
  360. {
  361. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  362. }
  363. /// <summary>
  364. /// Connect to broker
  365. /// </summary>
  366. /// <param name="clientId">Client identifier</param>
  367. /// <param name="username">Username</param>
  368. /// <param name="password">Password</param>
  369. /// <returns>Return code of CONNACK message from broker</returns>
  370. public byte Connect(string clientId,
  371. string username,
  372. string password)
  373. {
  374. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  375. }
  376. /// <summary>
  377. /// Connect to broker
  378. /// </summary>
  379. /// <param name="clientId">Client identifier</param>
  380. /// <param name="username">Username</param>
  381. /// <param name="password">Password</param>
  382. /// <param name="cleanSession">Clean sessione flag</param>
  383. /// <param name="keepAlivePeriod">Keep alive period</param>
  384. /// <returns>Return code of CONNACK message from broker</returns>
  385. public byte Connect(string clientId,
  386. string username,
  387. string password,
  388. bool cleanSession,
  389. ushort keepAlivePeriod)
  390. {
  391. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod);
  392. }
  393. /// <summary>
  394. /// Connect to broker
  395. /// </summary>
  396. /// <param name="clientId">Client identifier</param>
  397. /// <param name="username">Username</param>
  398. /// <param name="password">Password</param>
  399. /// <param name="willRetain">Will retain flag</param>
  400. /// <param name="willQosLevel">Will QOS level</param>
  401. /// <param name="willFlag">Will flag</param>
  402. /// <param name="willTopic">Will topic</param>
  403. /// <param name="willMessage">Will message</param>
  404. /// <param name="cleanSession">Clean sessione flag</param>
  405. /// <param name="keepAlivePeriod">Keep alive period</param>
  406. /// <returns>Return code of CONNACK message from broker</returns>
  407. public byte Connect(string clientId,
  408. string username,
  409. string password,
  410. bool willRetain,
  411. byte willQosLevel,
  412. bool willFlag,
  413. string willTopic,
  414. string willMessage,
  415. bool cleanSession,
  416. ushort keepAlivePeriod)
  417. {
  418. // create CONNECT message
  419. MqttMsgConnect connect = new MqttMsgConnect(clientId,
  420. username,
  421. password,
  422. willRetain,
  423. willQosLevel,
  424. willFlag,
  425. willTopic,
  426. willMessage,
  427. cleanSession,
  428. keepAlivePeriod);
  429. try
  430. {
  431. // connect to the broker
  432. this.channel.Connect();
  433. }
  434. catch (Exception ex)
  435. {
  436. throw new MqttConnectionException("Exception connecting to the broker", ex);
  437. }
  438. this.lastCommTime = 0;
  439. this.isRunning = true;
  440. // start thread for receiving messages from broker
  441. Fx.StartThread(this.ReceiveThread);
  442. MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
  443. // if connection accepted, start keep alive timer and
  444. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  445. {
  446. // set all client properties
  447. this.ClientId = clientId;
  448. this.CleanSession = cleanSession;
  449. this.WillFlag = willFlag;
  450. this.WillTopic = willTopic;
  451. this.WillMessage = willMessage;
  452. this.WillQosLevel = willQosLevel;
  453. this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms
  454. // start thread for sending keep alive message to the broker
  455. Fx.StartThread(this.KeepAliveThread);
  456. // start thread for raising received message event from broker
  457. Fx.StartThread(this.ReceiveEventThread);
  458. // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
  459. Fx.StartThread(this.ProcessInflightThread);
  460. this.IsConnected = true;
  461. }
  462. return connack.ReturnCode;
  463. }
  464. /// <summary>
  465. /// Disconnect from broker
  466. /// </summary>
  467. public void Disconnect()
  468. {
  469. MqttMsgDisconnect disconnect = new MqttMsgDisconnect();
  470. this.Send(disconnect);
  471. // close client
  472. this.Close();
  473. }
  474. #if BROKER
  475. /// <summary>
  476. /// Open client communication
  477. /// </summary>
  478. public void Open()
  479. {
  480. this.isRunning = true;
  481. // start thread for receiving messages from client
  482. Fx.StartThread(this.ReceiveThread);
  483. // start thread for raising received message event from client
  484. Fx.StartThread(this.ReceiveEventThread);
  485. // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
  486. Fx.StartThread(this.ProcessInflightThread);
  487. }
  488. #endif
  489. /// <summary>
  490. /// Close client
  491. /// </summary>
  492. #if BROKER
  493. public void Close()
  494. #else
  495. private void Close()
  496. #endif
  497. {
  498. // stop receiving thread
  499. this.isRunning = false;
  500. // wait end receive thread
  501. //if (this.receiveThread != null)
  502. // this.receiveThread.Join();
  503. // wait end receive event thread
  504. if (this.receiveEventWaitHandle != null)
  505. {
  506. this.receiveEventWaitHandle.Set();
  507. // NOTE : no join because Close() could be called inside ReceiveEventThread
  508. // so we have to avoid deadlock
  509. //this.receiveEventThread.Join();
  510. }
  511. // wait end process inflight thread
  512. if (this.inflightWaitHandle != null)
  513. {
  514. this.inflightWaitHandle.Set();
  515. // NOTE : no join because Close() could be called inside ProcessInflightThread
  516. // so we have to avoid deadlock
  517. //this.processInflightThread.Join();
  518. }
  519. // avoid deadlock if keep alive timeout expired
  520. if (!this.isKeepAliveTimeout)
  521. {
  522. #if BROKER
  523. // unlock keep alive thread and wait
  524. //if (this.keepAliveThread != null)
  525. this.keepAliveEvent.Set();
  526. #else
  527. // unlock keep alive thread and wait
  528. this.keepAliveEvent.Set();
  529. if (this.keepAliveEventEnd != null)
  530. this.keepAliveEventEnd.WaitOne();
  531. #endif
  532. }
  533. // close network channel
  534. this.channel.Close();
  535. // keep alive thread will set it gracefully
  536. if (!this.isKeepAliveTimeout)
  537. this.IsConnected = false;
  538. }
  539. /// <summary>
  540. /// Execute ping to broker for keep alive
  541. /// </summary>
  542. /// <returns>PINGRESP message from broker</returns>
  543. private MqttMsgPingResp Ping()
  544. {
  545. MqttMsgPingReq pingreq = new MqttMsgPingReq();
  546. try
  547. {
  548. // broker must send PINGRESP within timeout equal to keep alive period
  549. return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
  550. }
  551. catch (Exception e)
  552. {
  553. #if TRACE
  554. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  555. #endif
  556. this.isKeepAliveTimeout = true;
  557. // client must close connection
  558. this.Close();
  559. return null;
  560. }
  561. }
  562. #if BROKER
  563. /// <summary>
  564. /// Send CONNACK message to the client (connection accepted or not)
  565. /// </summary>
  566. /// <param name="returnCode">Return code for CONNACK message</param>
  567. /// <param name="connect">CONNECT message with all client information</param>
  568. public void Connack(byte returnCode, MqttMsgConnect connect)
  569. {
  570. this.lastCommTime = 0;
  571. // create CONNACK message and ...
  572. MqttMsgConnack connack = new MqttMsgConnack();
  573. connack.ReturnCode = returnCode;
  574. // ... send it to the client
  575. this.Send(connack);
  576. // connection accepted, start keep alive thread checking
  577. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  578. {
  579. this.ClientId = connect.ClientId;
  580. this.CleanSession = connect.CleanSession;
  581. this.WillFlag = connect.WillFlag;
  582. this.WillTopic = connect.WillTopic;
  583. this.WillMessage = connect.WillMessage;
  584. this.WillQosLevel = connect.WillQosLevel;
  585. this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms
  586. // broker has a tolerance of 1.5 specified keep alive period
  587. this.keepAlivePeriod += (this.keepAlivePeriod / 2);
  588. // start thread for checking keep alive period timeout
  589. Fx.StartThread(this.KeepAliveThread);
  590. this.IsConnected = true;
  591. }
  592. // connection refused, close TCP/IP channel
  593. else
  594. {
  595. this.Close();
  596. }
  597. }
  598. /// <summary>
  599. /// Send SUBACK message to the client
  600. /// </summary>
  601. /// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
  602. /// <param name="grantedQosLevels">Granted QoS Levels</param>
  603. public void Suback(ushort messageId, byte[] grantedQosLevels)
  604. {
  605. MqttMsgSuback suback = new MqttMsgSuback();
  606. suback.MessageId = messageId;
  607. suback.GrantedQoSLevels = grantedQosLevels;
  608. this.Send(suback);
  609. }
  610. /// <summary>
  611. /// Send UNSUBACK message to the client
  612. /// </summary>
  613. /// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
  614. public void Unsuback(ushort messageId)
  615. {
  616. MqttMsgUnsuback unsuback = new MqttMsgUnsuback();
  617. unsuback.MessageId = messageId;
  618. this.Send(unsuback);
  619. }
  620. #endif
  621. /// <summary>
  622. /// Subscribe for message topics
  623. /// </summary>
  624. /// <param name="topics">List of topics to subscribe</param>
  625. /// <param name="qosLevels">QOS levels related to topics</param>
  626. /// <returns>Message Id related to SUBSCRIBE message</returns>
  627. public ushort Subscribe(string[] topics, byte[] qosLevels)
  628. {
  629. MqttMsgSubscribe subscribe =
  630. new MqttMsgSubscribe(topics, qosLevels);
  631. subscribe.MessageId = this.GetMessageId();
  632. // enqueue subscribe request into the inflight queue
  633. this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish);
  634. return subscribe.MessageId;
  635. }
  636. /// <summary>
  637. /// Unsubscribe for message topics
  638. /// </summary>
  639. /// <param name="topics">List of topics to unsubscribe</param>
  640. /// <returns>Message Id in UNSUBACK message from broker</returns>
  641. public ushort Unsubscribe(string[] topics)
  642. {
  643. MqttMsgUnsubscribe unsubscribe =
  644. new MqttMsgUnsubscribe(topics);
  645. unsubscribe.MessageId = this.GetMessageId();
  646. // enqueue unsubscribe request into the inflight queue
  647. this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish);
  648. return unsubscribe.MessageId;
  649. }
  650. /// <summary>
  651. /// Publish a message asynchronously (QoS Level 0 and not retained)
  652. /// </summary>
  653. /// <param name="topic">Message topic</param>
  654. /// <param name="message">Message data (payload)</param>
  655. /// <returns>Message Id related to PUBLISH message</returns>
  656. public ushort Publish(string topic, byte[] message)
  657. {
  658. return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
  659. }
  660. /// <summary>
  661. /// Publish a message asynchronously
  662. /// </summary>
  663. /// <param name="topic">Message topic</param>
  664. /// <param name="message">Message data (payload)</param>
  665. /// <param name="qosLevel">QoS Level</param>
  666. /// <param name="retain">Retain flag</param>
  667. /// <returns>Message Id related to PUBLISH message</returns>
  668. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  669. {
  670. MqttMsgPublish publish =
  671. new MqttMsgPublish(topic, message, false, qosLevel, retain);
  672. publish.MessageId = this.GetMessageId();
  673. // enqueue message to publish into the inflight queue
  674. this.EnqueueInflight(publish, MqttMsgFlow.ToPublish);
  675. return publish.MessageId;
  676. }
  677. /// <summary>
  678. /// Wrapper method for raising message received event
  679. /// </summary>
  680. /// <param name="msg">Message received</param>
  681. private void OnMqttMsgReceived(MqttMsgBase msg)
  682. {
  683. lock (this.receiveQueue)
  684. {
  685. this.receiveQueue.Enqueue(msg);
  686. }
  687. this.receiveEventWaitHandle.Set();
  688. }
  689. /// <summary>
  690. /// Wrapper method for raising PUBLISH message received event
  691. /// </summary>
  692. /// <param name="publish">PUBLISH message received</param>
  693. private void OnMqttMsgPublishReceived(MqttMsgPublish publish)
  694. {
  695. if (this.MqttMsgPublishReceived != null)
  696. {
  697. this.MqttMsgPublishReceived(this,
  698. new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain));
  699. }
  700. }
  701. /// <summary>
  702. /// Wrapper method for raising published message event
  703. /// </summary>
  704. /// <param name="messageId">Message identifier for published message</param>
  705. private void OnMqttMsgPublished(ushort messageId)
  706. {
  707. if (this.MqttMsgPublished != null)
  708. {
  709. this.MqttMsgPublished(this,
  710. new MqttMsgPublishedEventArgs(messageId));
  711. }
  712. }
  713. /// <summary>
  714. /// Wrapper method for raising subscribed topic event
  715. /// </summary>
  716. /// <param name="suback">SUBACK message received</param>
  717. private void OnMqttMsgSubscribed(MqttMsgSuback suback)
  718. {
  719. if (this.MqttMsgSubscribed != null)
  720. {
  721. this.MqttMsgSubscribed(this,
  722. new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels));
  723. }
  724. }
  725. /// <summary>
  726. /// Wrapper method for raising unsubscribed topic event
  727. /// </summary>
  728. /// <param name="messageId">Message identifier for unsubscribed topic</param>
  729. private void OnMqttMsgUnsubscribed(ushort messageId)
  730. {
  731. if (this.MqttMsgUnsubscribed != null)
  732. {
  733. this.MqttMsgUnsubscribed(this,
  734. new MqttMsgUnsubscribedEventArgs(messageId));
  735. }
  736. }
  737. #if BROKER
  738. /// <summary>
  739. /// Wrapper method for raising SUBSCRIBE message event
  740. /// </summary>
  741. /// <param name="messageId">Message identifier for subscribe topics request</param>
  742. /// <param name="topics">Topics requested to subscribe</param>
  743. /// <param name="qosLevels">List of QOS Levels requested</param>
  744. private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels)
  745. {
  746. if (this.MqttMsgSubscribeReceived != null)
  747. {
  748. this.MqttMsgSubscribeReceived(this,
  749. new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels));
  750. }
  751. }
  752. /// <summary>
  753. /// Wrapper method for raising UNSUBSCRIBE message event
  754. /// </summary>
  755. /// <param name="messageId">Message identifier for unsubscribe topics request</param>
  756. /// <param name="topics">Topics requested to unsubscribe</param>
  757. private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics)
  758. {
  759. if (this.MqttMsgUnsubscribeReceived != null)
  760. {
  761. this.MqttMsgUnsubscribeReceived(this,
  762. new MqttMsgUnsubscribeEventArgs(messageId, topics));
  763. }
  764. }
  765. /// <summary>
  766. /// Wrapper method for client connection event
  767. /// </summary>
  768. private void OnMqttMsgConnected(MqttMsgConnect connect)
  769. {
  770. if (this.MqttMsgConnected != null)
  771. {
  772. this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect));
  773. }
  774. }
  775. #endif
  776. /// <summary>
  777. /// Wrapper method for client disconnection event
  778. /// </summary>
  779. private void OnMqttMsgDisconnected()
  780. {
  781. if (this.MqttMsgDisconnected != null)
  782. {
  783. this.MqttMsgDisconnected(this, EventArgs.Empty);
  784. }
  785. }
  786. /// <summary>
  787. /// Send a message
  788. /// </summary>
  789. /// <param name="msgBytes">Message bytes</param>
  790. private void Send(byte[] msgBytes)
  791. {
  792. try
  793. {
  794. // send message
  795. this.channel.Send(msgBytes);
  796. #if !BROKER
  797. // update last message sent ticks
  798. this.lastCommTime = Environment.TickCount;
  799. #endif
  800. }
  801. catch (Exception e)
  802. {
  803. #if TRACE
  804. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  805. #endif
  806. throw new MqttCommunicationException(e);
  807. }
  808. }
  809. /// <summary>
  810. /// Send a message
  811. /// </summary>
  812. /// <param name="msg">Message</param>
  813. private void Send(MqttMsgBase msg)
  814. {
  815. #if TRACE
  816. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  817. #endif
  818. this.Send(msg.GetBytes());
  819. }
  820. /// <summary>
  821. /// Send a message to the broker and wait answer
  822. /// </summary>
  823. /// <param name="msgBytes">Message bytes</param>
  824. /// <returns>MQTT message response</returns>
  825. private MqttMsgBase SendReceive(byte[] msgBytes)
  826. {
  827. return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  828. }
  829. /// <summary>
  830. /// Send a message to the broker and wait answer
  831. /// </summary>
  832. /// <param name="msgBytes">Message bytes</param>
  833. /// <param name="timeout">Timeout for receiving answer</param>
  834. /// <returns>MQTT message response</returns>
  835. private MqttMsgBase SendReceive(byte[] msgBytes, int timeout)
  836. {
  837. // reset handle before sending
  838. this.syncEndReceiving.Reset();
  839. try
  840. {
  841. // send message
  842. this.channel.Send(msgBytes);
  843. // update last message sent ticks
  844. this.lastCommTime = Environment.TickCount;
  845. }
  846. catch (Exception e)
  847. {
  848. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  849. if (typeof(SocketException) == e.GetType())
  850. {
  851. // connection reset by broker
  852. if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset)
  853. this.IsConnected = false;
  854. }
  855. #endif
  856. #if TRACE
  857. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  858. #endif
  859. throw new MqttCommunicationException(e);
  860. }
  861. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  862. // wait for answer from broker
  863. if (this.syncEndReceiving.WaitOne(timeout, false))
  864. #else
  865. // wait for answer from broker
  866. if (this.syncEndReceiving.WaitOne(timeout))
  867. #endif
  868. {
  869. // message received without exception
  870. if (this.exReceiving == null)
  871. return this.msgReceived;
  872. // receiving thread catched exception
  873. else
  874. throw this.exReceiving;
  875. }
  876. else
  877. {
  878. // throw timeout exception
  879. throw new MqttCommunicationException();
  880. }
  881. }
  882. /// <summary>
  883. /// Send a message to the broker and wait answer
  884. /// </summary>
  885. /// <param name="msg">Message</param>
  886. /// <returns>MQTT message response</returns>
  887. private MqttMsgBase SendReceive(MqttMsgBase msg)
  888. {
  889. return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  890. }
  891. /// <summary>
  892. /// Send a message to the broker and wait answer
  893. /// </summary>
  894. /// <param name="msg">Message</param>
  895. /// <param name="timeout">Timeout for receiving answer</param>
  896. /// <returns>MQTT message response</returns>
  897. private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout)
  898. {
  899. #if TRACE
  900. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  901. #endif
  902. return this.SendReceive(msg.GetBytes(), timeout);
  903. }
  904. /// <summary>
  905. /// Enqueue a message into the inflight queue
  906. /// </summary>
  907. /// <param name="msg">Message to enqueue</param>
  908. /// <param name="flow">Message flow (publish, acknowledge)</param>
  909. private void EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow)
  910. {
  911. // enqueue is needed (or not)
  912. bool enqueue = true;
  913. // if it is a PUBLISH message with QoS Level 2
  914. if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  915. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  916. {
  917. lock (this.inflightQueue)
  918. {
  919. // if it is a PUBLISH message already received (it is in the inflight queue), the publisher
  920. // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
  921. // NOTE : I need to find on message id and flow because the broker could be publish/received
  922. // to/from client and message id could be the same (one tracked by broker and the other by client)
  923. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(((MqttMsgPublish)msg).MessageId, MqttMsgFlow.ToAcknowledge);
  924. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  925. // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
  926. // to change state to re-send PUBREC
  927. if (msgCtx != null)
  928. {
  929. msgCtx.State = MqttMsgState.QueuedQos2;
  930. msgCtx.Flow = MqttMsgFlow.ToAcknowledge;
  931. enqueue = false;
  932. }
  933. }
  934. }
  935. if (enqueue)
  936. {
  937. // set a default state
  938. MqttMsgState state = MqttMsgState.QueuedQos0;
  939. // based on QoS level, the messages flow between broker and client changes
  940. switch (msg.QosLevel)
  941. {
  942. // QoS Level 0
  943. case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE:
  944. state = MqttMsgState.QueuedQos0;
  945. break;
  946. // QoS Level 1
  947. case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE:
  948. state = MqttMsgState.QueuedQos1;
  949. break;
  950. // QoS Level 2
  951. case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE:
  952. state = MqttMsgState.QueuedQos2;
  953. break;
  954. }
  955. // queue message context
  956. MqttMsgContext msgContext = new MqttMsgContext()
  957. {
  958. Message = msg,
  959. State = state,
  960. Flow = flow,
  961. Attempt = 0
  962. };
  963. lock (this.inflightQueue)
  964. {
  965. // enqueue message and unlock send thread
  966. this.inflightQueue.Enqueue(msgContext);
  967. }
  968. }
  969. this.inflightWaitHandle.Set();
  970. }
  971. /// <summary>
  972. /// Enqueue a message into the internal queue
  973. /// </summary>
  974. /// <param name="msg">Message to enqueue</param>
  975. private void EnqueueInternal(MqttMsgBase msg)
  976. {
  977. // enqueue is needed (or not)
  978. bool enqueue = true;
  979. // if it is a PUBREL message (for QoS Level 2)
  980. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)
  981. {
  982. lock (this.inflightQueue)
  983. {
  984. // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
  985. // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
  986. // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
  987. // NOTE : I need to find on message id and flow because the broker could be publish/received
  988. // to/from client and message id could be the same (one tracked by broker and the other by client)
  989. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(((MqttMsgPubrel)msg).MessageId, MqttMsgFlow.ToAcknowledge);
  990. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  991. // the PUBLISH message isn't in the inflight queue, it was already processed so
  992. // we need to re-send PUBCOMP only
  993. if (msgCtx == null)
  994. {
  995. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  996. pubcomp.MessageId = ((MqttMsgPubrel)msg).MessageId;
  997. this.Send(pubcomp);
  998. enqueue = false;
  999. }
  1000. }
  1001. }
  1002. if (enqueue)
  1003. {
  1004. lock (this.internalQueue)
  1005. {
  1006. this.internalQueue.Enqueue(msg);
  1007. this.inflightWaitHandle.Set();
  1008. }
  1009. }
  1010. }
  1011. /// <summary>
  1012. /// Thread for receiving messages
  1013. /// </summary>
  1014. private void ReceiveThread()
  1015. {
  1016. int readBytes = 0;
  1017. byte[] fixedHeaderFirstByte = new byte[1];
  1018. byte msgType;
  1019. #if BROKER
  1020. long now = 0;
  1021. // receive thread started, broker need to receive the first message
  1022. // (CONNECT) within a reasonable amount of time after TCP/IP connection
  1023. long connectTime = Environment.TickCount;
  1024. #endif
  1025. while (this.isRunning)
  1026. {
  1027. try
  1028. {
  1029. if (this.channel.DataAvailable)
  1030. // read first byte (fixed header)
  1031. readBytes = this.channel.Receive(fixedHeaderFirstByte);
  1032. else
  1033. {
  1034. #if BROKER
  1035. // client not connected (client didn't send CONNECT yet)
  1036. if (!this.IsConnected)
  1037. {
  1038. now = Environment.TickCount;
  1039. // if connect timeout exceeded ...
  1040. if ((now - connectTime) >= this.settings.TimeoutOnConnection)
  1041. {
  1042. // client must close connection
  1043. this.Close();
  1044. // client raw disconnection
  1045. this.OnMqttMsgDisconnected();
  1046. }
  1047. }
  1048. #endif
  1049. // no bytes available, sleep before retry
  1050. readBytes = 0;
  1051. Fx.SleepThread(10);
  1052. }
  1053. if (readBytes > 0)
  1054. {
  1055. #if BROKER
  1056. // update last message received ticks
  1057. this.lastCommTime = Environment.TickCount;
  1058. #endif
  1059. // extract message type from received byte
  1060. msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET);
  1061. switch (msgType)
  1062. {
  1063. // CONNECT message received
  1064. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1065. #if BROKER
  1066. MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], this.channel);
  1067. #if TRACE
  1068. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", connect);
  1069. #endif
  1070. // raise message received event
  1071. this.OnMqttMsgReceived(connect);
  1072. break;
  1073. #else
  1074. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1075. #endif
  1076. // CONNACK message received
  1077. case MqttMsgBase.MQTT_MSG_CONNACK_TYPE:
  1078. #if BROKER
  1079. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1080. #else
  1081. this.msgReceived = MqttMsgConnack.Parse(fixedHeaderFirstByte[0], this.channel);
  1082. #if TRACE
  1083. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1084. #endif
  1085. this.syncEndReceiving.Set();
  1086. break;
  1087. #endif
  1088. // PINGREQ message received
  1089. case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE:
  1090. #if BROKER
  1091. this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], this.channel);
  1092. #if TRACE
  1093. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1094. #endif
  1095. MqttMsgPingResp pingresp = new MqttMsgPingResp();
  1096. this.Send(pingresp);
  1097. // raise message received event
  1098. //this.OnMqttMsgReceived(this.msgReceived);
  1099. break;
  1100. #else
  1101. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1102. #endif
  1103. // PINGRESP message received
  1104. case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE:
  1105. #if BROKER
  1106. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1107. #else
  1108. this.msgReceived = MqttMsgPingResp.Parse(fixedHeaderFirstByte[0], this.channel);
  1109. #if TRACE
  1110. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1111. #endif
  1112. this.syncEndReceiving.Set();
  1113. break;
  1114. #endif
  1115. // SUBSCRIBE message received
  1116. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1117. #if BROKER
  1118. MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], this.channel);
  1119. #if TRACE
  1120. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", subscribe);
  1121. #endif
  1122. // raise message received event
  1123. this.OnMqttMsgReceived(subscribe);
  1124. break;
  1125. #else
  1126. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1127. #endif
  1128. // SUBACK message received
  1129. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1130. #if BROKER
  1131. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1132. #else
  1133. // enqueue SUBACK message received (for QoS Level 1) into the internal queue
  1134. MqttMsgSuback suback = MqttMsgSuback.Parse(fixedHeaderFirstByte[0], this.channel);
  1135. #if TRACE
  1136. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", suback);
  1137. #endif
  1138. // enqueue SUBACK message into the internal queue
  1139. this.EnqueueInternal(suback);
  1140. break;
  1141. #endif
  1142. // PUBLISH message received
  1143. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1144. MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], this.channel);
  1145. #if TRACE
  1146. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", publish);
  1147. #endif
  1148. // enqueue PUBLISH message to acknowledge into the inflight queue
  1149. this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge);
  1150. break;
  1151. // PUBACK message received
  1152. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1153. // enqueue PUBACK message received (for QoS Level 1) into the internal queue
  1154. MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], this.channel);
  1155. #if TRACE
  1156. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", puback);
  1157. #endif
  1158. // enqueue PUBACK message into the internal queue
  1159. this.EnqueueInternal(puback);
  1160. break;
  1161. // PUBREC message received
  1162. case MqttMsgBase.MQTT_MSG_PUBREC_TYPE:
  1163. // enqueue PUBREC message received (for QoS Level 2) into the internal queue
  1164. MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], this.channel);
  1165. #if TRACE
  1166. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrec);
  1167. #endif
  1168. // enqueue PUBREC message into the internal queue
  1169. this.EnqueueInternal(pubrec);
  1170. break;
  1171. // PUBREL message received
  1172. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1173. // enqueue PUBREL message received (for QoS Level 2) into the internal queue
  1174. MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], this.channel);
  1175. #if TRACE
  1176. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrel);
  1177. #endif
  1178. // enqueue PUBREL message into the internal queue
  1179. this.EnqueueInternal(pubrel);
  1180. break;
  1181. // PUBCOMP message received
  1182. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1183. // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
  1184. MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], this.channel);
  1185. #if TRACE
  1186. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubcomp);
  1187. #endif
  1188. // enqueue PUBCOMP message into the internal queue
  1189. this.EnqueueInternal(pubcomp);
  1190. break;
  1191. // UNSUBSCRIBE message received
  1192. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1193. #if BROKER
  1194. MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], this.channel);
  1195. #if TRACE
  1196. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsubscribe);
  1197. #endif
  1198. // raise message received event
  1199. this.OnMqttMsgReceived(unsubscribe);
  1200. break;
  1201. #else
  1202. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1203. #endif
  1204. // UNSUBACK message received
  1205. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1206. #if BROKER
  1207. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1208. #else
  1209. // enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
  1210. MqttMsgUnsuback unsuback = MqttMsgUnsuback.Parse(fixedHeaderFirstByte[0], this.channel);
  1211. #if TRACE
  1212. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsuback);
  1213. #endif
  1214. // enqueue UNSUBACK message into the internal queue
  1215. this.EnqueueInternal(unsuback);
  1216. break;
  1217. #endif
  1218. // DISCONNECT message received
  1219. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1220. #if BROKER
  1221. MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], this.channel);
  1222. #if TRACE
  1223. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", disconnect);
  1224. #endif
  1225. // raise message received event
  1226. this.OnMqttMsgReceived(disconnect);
  1227. break;
  1228. #else
  1229. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1230. #endif
  1231. default:
  1232. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1233. }
  1234. this.exReceiving = null;
  1235. }
  1236. }
  1237. catch (Exception e)
  1238. {
  1239. #if TRACE
  1240. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1241. #endif
  1242. this.exReceiving = new MqttCommunicationException(e);
  1243. }
  1244. }
  1245. }
  1246. /// <summary>
  1247. /// Thread for handling keep alive message
  1248. /// </summary>
  1249. private void KeepAliveThread()
  1250. {
  1251. long now = 0;
  1252. int wait = this.keepAlivePeriod;
  1253. this.isKeepAliveTimeout = false;
  1254. // create event to signal that current thread is end
  1255. this.keepAliveEventEnd = new AutoResetEvent(false);
  1256. while (this.isRunning)
  1257. {
  1258. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1259. // waiting...
  1260. this.keepAliveEvent.WaitOne(wait, false);
  1261. #else
  1262. // waiting...
  1263. this.keepAliveEvent.WaitOne(wait);
  1264. #endif
  1265. if (this.isRunning)
  1266. {
  1267. now = Environment.TickCount;
  1268. // if timeout exceeded ...
  1269. if ((now - this.lastCommTime) >= this.keepAlivePeriod)
  1270. {
  1271. #if BROKER
  1272. this.isKeepAliveTimeout = true;
  1273. // client must close connection
  1274. this.Close();
  1275. #else
  1276. // ... send keep alive
  1277. this.Ping();
  1278. wait = this.keepAlivePeriod;
  1279. #endif
  1280. }
  1281. else
  1282. {
  1283. // update waiting time
  1284. wait = (int)(this.keepAlivePeriod - (now - this.lastCommTime));
  1285. }
  1286. }
  1287. }
  1288. if (this.isKeepAliveTimeout)
  1289. {
  1290. this.IsConnected = false;
  1291. // raise disconnection client event
  1292. this.OnMqttMsgDisconnected();
  1293. }
  1294. // signal thread end
  1295. this.keepAliveEventEnd.Set();
  1296. }
  1297. /// <summary>
  1298. /// Thread for raising received message event
  1299. /// </summary>
  1300. private void ReceiveEventThread()
  1301. {
  1302. while (this.isRunning)
  1303. {
  1304. if (this.receiveQueue.Count == 0)
  1305. // wait on receiving message from client
  1306. this.receiveEventWaitHandle.WaitOne();
  1307. // check if it is running or we are closing client
  1308. if (this.isRunning)
  1309. {
  1310. // get message from queue
  1311. MqttMsgBase msg = null;
  1312. lock (this.receiveQueue)
  1313. {
  1314. if (this.receiveQueue.Count > 0)
  1315. msg = (MqttMsgBase)this.receiveQueue.Dequeue();
  1316. }
  1317. if (msg != null)
  1318. {
  1319. switch (msg.Type)
  1320. {
  1321. // CONNECT message received
  1322. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1323. #if BROKER
  1324. // raise connected client event (CONNECT message received)
  1325. this.OnMqttMsgConnected((MqttMsgConnect)msg);
  1326. break;
  1327. #else
  1328. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1329. #endif
  1330. // SUBSCRIBE message received
  1331. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1332. #if BROKER
  1333. MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg;
  1334. // raise subscribe topic event (SUBSCRIBE message received)
  1335. this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels);
  1336. break;
  1337. #else
  1338. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1339. #endif
  1340. // SUBACK message received
  1341. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1342. // raise subscribed topic event (SUBACK message received)
  1343. this.OnMqttMsgSubscribed((MqttMsgSuback)msg);
  1344. break;
  1345. // PUBLISH message received
  1346. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1347. // raise PUBLISH message received event
  1348. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1349. break;
  1350. // PUBACK message received
  1351. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1352. // raise published message event
  1353. // (PUBACK received for QoS Level 1)
  1354. this.OnMqttMsgPublished(((MqttMsgPuback)msg).MessageId);
  1355. break;
  1356. // PUBREL message received
  1357. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1358. // raise message received event
  1359. // (PUBREL received for QoS Level 2)
  1360. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1361. break;
  1362. // PUBCOMP message received
  1363. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1364. // raise published message event
  1365. // (PUBCOMP received for QoS Level 2)
  1366. this.OnMqttMsgPublished(((MqttMsgPubcomp)msg).MessageId);
  1367. break;
  1368. // UNSUBSCRIBE message received from client
  1369. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1370. #if BROKER
  1371. MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg;
  1372. // raise unsubscribe topic event (UNSUBSCRIBE message received)
  1373. this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics);
  1374. break;
  1375. #else
  1376. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1377. #endif
  1378. // UNSUBACK message received
  1379. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1380. // raise unsubscribed topic event
  1381. this.OnMqttMsgUnsubscribed(((MqttMsgUnsuback)msg).MessageId);
  1382. break;
  1383. // DISCONNECT message received from client
  1384. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1385. #if BROKER
  1386. // raise disconnected client event (DISCONNECT message received)
  1387. this.OnMqttMsgDisconnected();
  1388. break;
  1389. #else
  1390. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1391. #endif
  1392. }
  1393. }
  1394. }
  1395. }
  1396. }
  1397. /// <summary>
  1398. /// Process inflight messages queue
  1399. /// </summary>
  1400. private void ProcessInflightThread()
  1401. {
  1402. MqttMsgContext msgContext = null;
  1403. MqttMsgBase msgInflight = null;
  1404. MqttMsgBase msgReceived = null;
  1405. bool acknowledge = false;
  1406. int timeout = Timeout.Infinite;
  1407. try
  1408. {
  1409. while (this.isRunning)
  1410. {
  1411. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1412. // wait on message queueud to inflight
  1413. this.inflightWaitHandle.WaitOne(timeout, false);
  1414. #else
  1415. // wait on message queueud to inflight
  1416. this.inflightWaitHandle.WaitOne(timeout);
  1417. #endif
  1418. // it could be unblocked because Close() method is joining
  1419. if (this.isRunning)
  1420. {
  1421. lock (this.inflightQueue)
  1422. {
  1423. // set timeout tu MaxValue instead of Infinte (-1) to perform
  1424. // compare with calcultad current msgTimeout
  1425. timeout = Int32.MaxValue;
  1426. // a message inflight could be re-enqueued but we have to
  1427. // analyze it only just one time for cycle
  1428. int count = this.inflightQueue.Count;
  1429. // process all inflight queued messages
  1430. while (count > 0)
  1431. {
  1432. count--;
  1433. acknowledge = false;
  1434. msgReceived = null;
  1435. // dequeue message context from queue
  1436. msgContext = (MqttMsgContext)this.inflightQueue.Dequeue();
  1437. // get inflight message
  1438. msgInflight = (MqttMsgBase)msgContext.Message;
  1439. switch (msgContext.State)
  1440. {
  1441. case MqttMsgState.QueuedQos0:
  1442. // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
  1443. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1444. {
  1445. this.Send(msgInflight);
  1446. }
  1447. // QoS 0, no need acknowledge
  1448. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1449. {
  1450. // notify published message from broker (no need acknowledged)
  1451. this.OnMqttMsgReceived(msgInflight);
  1452. }
  1453. break;
  1454. case MqttMsgState.QueuedQos1:
  1455. // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
  1456. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1457. {
  1458. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1459. // PUBLISH message to send, wait for PUBACK
  1460. msgContext.State = MqttMsgState.WaitForPuback;
  1461. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1462. // SUBSCRIBE message to send, wait for SUBACK
  1463. msgContext.State = MqttMsgState.WaitForSuback;
  1464. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1465. // UNSUBSCRIBE message to send, wait for UNSUBACK
  1466. msgContext.State = MqttMsgState.WaitForUnsuback;
  1467. msgContext.Timestamp = Environment.TickCount;
  1468. msgContext.Attempt++;
  1469. // retry ? set dup flag
  1470. if (msgContext.Attempt > 1)
  1471. msgInflight.DupFlag = true;
  1472. this.Send(msgInflight);
  1473. // update timeout
  1474. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1475. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1476. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1477. this.inflightQueue.Enqueue(msgContext);
  1478. }
  1479. // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
  1480. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1481. {
  1482. MqttMsgPuback puback = new MqttMsgPuback();
  1483. puback.MessageId = ((MqttMsgPublish)msgInflight).MessageId;
  1484. this.Send(puback);
  1485. // notify published message from broker and acknowledged
  1486. this.OnMqttMsgReceived(msgInflight);
  1487. }
  1488. break;
  1489. case MqttMsgState.QueuedQos2:
  1490. // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
  1491. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1492. {
  1493. msgContext.State = MqttMsgState.WaitForPubrec;
  1494. msgContext.Timestamp = Environment.TickCount;
  1495. msgContext.Attempt++;
  1496. // retry ? set dup flag
  1497. if (msgContext.Attempt > 1)
  1498. msgInflight.DupFlag = true;
  1499. this.Send(msgInflight);
  1500. // update timeout
  1501. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1502. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1503. // re-enqueue message (I have to re-analyze for receiving PUBREC)
  1504. this.inflightQueue.Enqueue(msgContext);
  1505. }
  1506. // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
  1507. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1508. {
  1509. MqttMsgPubrec pubrec = new MqttMsgPubrec();
  1510. pubrec.MessageId = ((MqttMsgPublish)msgInflight).MessageId;
  1511. msgContext.State = MqttMsgState.WaitForPubrel;
  1512. this.Send(pubrec);
  1513. // re-enqueue message (I have to re-analyze for receiving PUBREL)
  1514. this.inflightQueue.Enqueue(msgContext);
  1515. }
  1516. break;
  1517. case MqttMsgState.WaitForPuback:
  1518. case MqttMsgState.WaitForSuback:
  1519. case MqttMsgState.WaitForUnsuback:
  1520. // QoS 1, waiting for PUBACK of a PUBLISH message sent or
  1521. // waiting for SUBACK of a SUBSCRIBE message sent or
  1522. // waiting for UNSUBACK of a UNSUBSCRIBE message sent or
  1523. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1524. {
  1525. acknowledge = false;
  1526. lock (this.internalQueue)
  1527. {
  1528. if (this.internalQueue.Count > 0)
  1529. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1530. }
  1531. // it is a PUBACK message or a SUBACK/UNSUBACK message
  1532. if (msgReceived != null)
  1533. {
  1534. // PUBACK message or SUBACK/UNSUBACK message for the current message
  1535. if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (((MqttMsgPuback)msgReceived).MessageId == ((MqttMsgPublish)msgInflight).MessageId)) ||
  1536. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (((MqttMsgSuback)msgReceived).MessageId == ((MqttMsgSubscribe)msgInflight).MessageId)) ||
  1537. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (((MqttMsgUnsuback)msgReceived).MessageId == ((MqttMsgUnsubscribe)msgInflight).MessageId)))
  1538. {
  1539. lock (this.internalQueue)
  1540. {
  1541. // received message processed
  1542. this.internalQueue.Dequeue();
  1543. acknowledge = true;
  1544. }
  1545. // notify received acknowledge from broker of a published message or subscribe/unsubscribe message
  1546. this.OnMqttMsgReceived(msgReceived);
  1547. }
  1548. }
  1549. // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
  1550. if (!acknowledge)
  1551. {
  1552. // check timeout for receiving PUBACK since PUBLISH was sent or
  1553. // for receiving SUBACK since SUBSCRIBE was sent or
  1554. // for receiving UNSUBACK since UNSUBSCRIBE was sent
  1555. if ((Environment.TickCount - msgContext.Timestamp) >= this.settings.DelayOnRetry)
  1556. {
  1557. // max retry not reached, resend
  1558. if (msgContext.Attempt <= this.settings.AttemptsOnRetry)
  1559. {
  1560. msgContext.State = MqttMsgState.QueuedQos1;
  1561. // re-enqueue message
  1562. this.inflightQueue.Enqueue(msgContext);
  1563. // update timeout (0 -> reanalyze queue immediately)
  1564. timeout = 0;
  1565. }
  1566. }
  1567. else
  1568. {
  1569. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1570. this.inflightQueue.Enqueue(msgContext);
  1571. // update timeout
  1572. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1573. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1574. }
  1575. }
  1576. }
  1577. break;
  1578. case MqttMsgState.WaitForPubrec:
  1579. // QoS 2, waiting for PUBREC of a PUBLISH message sent
  1580. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1581. {
  1582. acknowledge = false;
  1583. lock (this.internalQueue)
  1584. {
  1585. if (this.internalQueue.Count > 0)
  1586. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1587. }
  1588. // it is a PUBREC message
  1589. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  1590. {
  1591. // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
  1592. if (((MqttMsgPubrec)msgReceived).MessageId == ((MqttMsgPublish)msgInflight).MessageId)
  1593. {
  1594. lock (this.internalQueue)
  1595. {
  1596. // received message processed
  1597. this.internalQueue.Dequeue();
  1598. acknowledge = true;
  1599. }
  1600. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1601. pubrel.MessageId = ((MqttMsgPublish)msgInflight).MessageId;
  1602. msgContext.State = MqttMsgState.WaitForPubcomp;
  1603. msgContext.Timestamp = Environment.TickCount;
  1604. msgContext.Attempt = 1;
  1605. this.Send(pubrel);
  1606. // update timeout
  1607. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1608. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1609. // re-enqueue message
  1610. this.inflightQueue.Enqueue(msgContext);
  1611. }
  1612. }
  1613. // current message not acknowledged
  1614. if (!acknowledge)
  1615. {
  1616. // check timeout for receiving PUBREC since PUBLISH was sent
  1617. if ((Environment.TickCount - msgContext.Timestamp) >= this.settings.DelayOnRetry)
  1618. {
  1619. // max retry not reached, resend
  1620. if (msgContext.Attempt <= this.settings.AttemptsOnRetry)
  1621. {
  1622. msgContext.State = MqttMsgState.QueuedQos2;
  1623. // re-enqueue message
  1624. this.inflightQueue.Enqueue(msgContext);
  1625. // update timeout (0 -> reanalyze queue immediately)
  1626. timeout = 0;
  1627. }
  1628. }
  1629. else
  1630. {
  1631. // re-enqueue message
  1632. this.inflightQueue.Enqueue(msgContext);
  1633. // update timeout
  1634. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1635. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1636. }
  1637. }
  1638. }
  1639. break;
  1640. case MqttMsgState.WaitForPubrel:
  1641. // QoS 2, waiting for PUBREL of a PUBREC message sent
  1642. if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1643. {
  1644. lock (this.internalQueue)
  1645. {
  1646. if (this.internalQueue.Count > 0)
  1647. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1648. }
  1649. // it is a PUBREL message
  1650. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE))
  1651. {
  1652. // PUBREL message for the current message, send PUBCOMP
  1653. if (((MqttMsgPubrel)msgReceived).MessageId == ((MqttMsgPublish)msgInflight).MessageId)
  1654. {
  1655. lock (this.internalQueue)
  1656. {
  1657. // received message processed
  1658. this.internalQueue.Dequeue();
  1659. }
  1660. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1661. pubcomp.MessageId = ((MqttMsgPublish)msgInflight).MessageId;
  1662. this.Send(pubcomp);
  1663. // notify published message from broker and acknowledged
  1664. this.OnMqttMsgReceived(msgInflight);
  1665. }
  1666. else
  1667. {
  1668. // re-enqueue message
  1669. this.inflightQueue.Enqueue(msgContext);
  1670. }
  1671. }
  1672. else
  1673. {
  1674. // re-enqueue message
  1675. this.inflightQueue.Enqueue(msgContext);
  1676. }
  1677. }
  1678. break;
  1679. case MqttMsgState.WaitForPubcomp:
  1680. // QoS 2, waiting for PUBCOMP of a PUBREL message sent
  1681. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1682. {
  1683. acknowledge = false;
  1684. lock (this.internalQueue)
  1685. {
  1686. if (this.internalQueue.Count > 0)
  1687. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1688. }
  1689. // it is a PUBCOMP message
  1690. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE))
  1691. {
  1692. // PUBCOMP message for the current message
  1693. if (((MqttMsgPubcomp)msgReceived).MessageId == ((MqttMsgPublish)msgInflight).MessageId)
  1694. {
  1695. lock (this.internalQueue)
  1696. {
  1697. // received message processed
  1698. this.internalQueue.Dequeue();
  1699. acknowledge = true;
  1700. }
  1701. // notify received acknowledge from broker of a published message
  1702. this.OnMqttMsgReceived(msgReceived);
  1703. }
  1704. }
  1705. // current message not acknowledged
  1706. if (!acknowledge)
  1707. {
  1708. // check timeout for receiving PUBCOMP since PUBREL was sent
  1709. if ((Environment.TickCount - msgContext.Timestamp) >= this.settings.DelayOnRetry)
  1710. {
  1711. // max retry not reached, resend
  1712. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1713. {
  1714. msgContext.State = MqttMsgState.SendPubrel;
  1715. // re-enqueue message
  1716. this.inflightQueue.Enqueue(msgContext);
  1717. // update timeout (0 -> reanalyze queue immediately)
  1718. timeout = 0;
  1719. }
  1720. }
  1721. else
  1722. {
  1723. // re-enqueue message
  1724. this.inflightQueue.Enqueue(msgContext);
  1725. // update timeout
  1726. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1727. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1728. }
  1729. }
  1730. }
  1731. break;
  1732. case MqttMsgState.SendPubrec:
  1733. // TODO : impossible ? --> QueuedQos2 ToAcknowledge
  1734. break;
  1735. case MqttMsgState.SendPubrel:
  1736. // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
  1737. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1738. {
  1739. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1740. pubrel.MessageId = ((MqttMsgPublish)msgInflight).MessageId;
  1741. msgContext.State = MqttMsgState.WaitForPubcomp;
  1742. msgContext.Timestamp = Environment.TickCount;
  1743. msgContext.Attempt++;
  1744. // retry ? set dup flag
  1745. if (msgContext.Attempt > 1)
  1746. pubrel.DupFlag = true;
  1747. this.Send(pubrel);
  1748. // update timeout
  1749. int msgTimeout = (this.settings.DelayOnRetry - (Environment.TickCount - msgContext.Timestamp));
  1750. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1751. // re-enqueue message
  1752. this.inflightQueue.Enqueue(msgContext);
  1753. }
  1754. break;
  1755. case MqttMsgState.SendPubcomp:
  1756. // TODO : impossible ?
  1757. break;
  1758. case MqttMsgState.SendPuback:
  1759. // TODO : impossible ? --> QueuedQos1 ToAcknowledge
  1760. break;
  1761. default:
  1762. break;
  1763. }
  1764. }
  1765. // if calculated timeout is MaxValue, it means that must be Infinite (-1)
  1766. if (timeout == Int32.MaxValue)
  1767. timeout = Timeout.Infinite;
  1768. }
  1769. }
  1770. }
  1771. }
  1772. catch (MqttCommunicationException e)
  1773. {
  1774. // possible exception on Send, I need to re-enqueue not sent message
  1775. if (msgContext != null)
  1776. // re-enqueue message
  1777. this.inflightQueue.Enqueue(msgContext);
  1778. #if TRACE
  1779. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1780. #endif
  1781. this.Close();
  1782. // raise disconnection client event
  1783. this.OnMqttMsgDisconnected();
  1784. }
  1785. }
  1786. /// <summary>
  1787. /// Generate the next message identifier
  1788. /// </summary>
  1789. /// <returns>Message identifier</returns>
  1790. private ushort GetMessageId()
  1791. {
  1792. // if 0 or max UInt16, it becomes 1 (first valid messageId)
  1793. this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1;
  1794. return this.messageIdCounter;
  1795. }
  1796. /// <summary>
  1797. /// Finder class for PUBLISH message inside a queue
  1798. /// </summary>
  1799. internal class MqttMsgContextFinder
  1800. {
  1801. // PUBLISH message id
  1802. internal ushort MessageId { get; set; }
  1803. // message flow into inflight queue
  1804. internal MqttMsgFlow Flow { get; set; }
  1805. /// <summary>
  1806. /// Constructor
  1807. /// </summary>
  1808. /// <param name="messageId">Message Id</param>
  1809. /// <param name="flow">Message flow inside inflight queue</param>
  1810. internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow)
  1811. {
  1812. this.MessageId = messageId;
  1813. this.Flow = flow;
  1814. }
  1815. internal bool Find(object item)
  1816. {
  1817. MqttMsgContext msgCtx = (MqttMsgContext)item;
  1818. return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1819. (((MqttMsgPublish)msgCtx.Message).MessageId == this.MessageId) &&
  1820. msgCtx.Flow == this.Flow);
  1821. }
  1822. }
  1823. }
  1824. }