MqttClient.cs 120 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Net;
  15. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  16. using System.Net.Sockets;
  17. using System.Security.Cryptography.X509Certificates;
  18. #endif
  19. using System.Threading;
  20. using uPLibrary.Networking.M2Mqtt.Exceptions;
  21. using uPLibrary.Networking.M2Mqtt.Messages;
  22. using uPLibrary.Networking.M2Mqtt.Session;
  23. using uPLibrary.Networking.M2Mqtt.Utility;
  24. using uPLibrary.Networking.M2Mqtt.Internal;
  25. // if .Net Micro Framework
  26. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  27. using Microsoft.SPOT;
  28. #if SSL
  29. using Microsoft.SPOT.Net.Security;
  30. #endif
  31. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  32. #else
  33. using System.Collections.Generic;
  34. #if (SSL && !(WINDOWS_APP || WINDOWS_PHONE_APP))
  35. using System.Security.Authentication;
  36. using System.Net.Security;
  37. #endif
  38. #endif
  39. #if (WINDOWS_APP || WINDOWS_PHONE_APP)
  40. using Windows.Networking.Sockets;
  41. #endif
  42. using System.Collections;
  43. // alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
  44. // (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
  45. using MqttUtility = uPLibrary.Networking.M2Mqtt.Utility;
  46. namespace uPLibrary.Networking.M2Mqtt
  47. {
  48. /// <summary>
  49. /// MQTT Client
  50. /// </summary>
  51. public class MqttClient
  52. {
  53. #if BROKER
  54. #region Constants ...
  55. // thread names
  56. private const string RECEIVE_THREAD_NAME = "ReceiveThread";
  57. private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread";
  58. private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread";
  59. private const string KEEP_ALIVE_THREAD = "KeepAliveThread";
  60. #endregion
  61. #endif
  62. /// <summary>
  63. /// Delagate that defines event handler for PUBLISH message received
  64. /// </summary>
  65. public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e);
  66. /// <summary>
  67. /// Delegate that defines event handler for published message
  68. /// </summary>
  69. public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e);
  70. /// <summary>
  71. /// Delagate that defines event handler for subscribed topic
  72. /// </summary>
  73. public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e);
  74. /// <summary>
  75. /// Delagate that defines event handler for unsubscribed topic
  76. /// </summary>
  77. public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e);
  78. #if BROKER
  79. /// <summary>
  80. /// Delagate that defines event handler for SUBSCRIBE message received
  81. /// </summary>
  82. public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e);
  83. /// <summary>
  84. /// Delagate that defines event handler for UNSUBSCRIBE message received
  85. /// </summary>
  86. public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e);
  87. /// <summary>
  88. /// Delagate that defines event handler for CONNECT message received
  89. /// </summary>
  90. public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e);
  91. /// <summary>
  92. /// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
  93. /// </summary>
  94. public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e);
  95. #endif
  96. /// <summary>
  97. /// Delegate that defines event handler for cliet/peer disconnection
  98. /// </summary>
  99. public delegate void ConnectionClosedEventHandler(object sender, EventArgs e);
  100. // broker hostname (or ip address) and port
  101. private string brokerHostName;
  102. private int brokerPort;
  103. // running status of threads
  104. private bool isRunning;
  105. // event for raising received message event
  106. private AutoResetEvent receiveEventWaitHandle;
  107. // event for starting process inflight queue asynchronously
  108. private AutoResetEvent inflightWaitHandle;
  109. // event for signaling synchronous receive
  110. AutoResetEvent syncEndReceiving;
  111. // message received
  112. MqttMsgBase msgReceived;
  113. // exeption thrown during receiving
  114. Exception exReceiving;
  115. // keep alive period (in ms)
  116. private int keepAlivePeriod;
  117. // events for signaling on keep alive thread
  118. private AutoResetEvent keepAliveEvent;
  119. private AutoResetEvent keepAliveEventEnd;
  120. // last communication time in ticks
  121. private int lastCommTime;
  122. // event for PUBLISH message received
  123. public event MqttMsgPublishEventHandler MqttMsgPublishReceived;
  124. // event for published message
  125. public event MqttMsgPublishedEventHandler MqttMsgPublished;
  126. // event for subscribed topic
  127. public event MqttMsgSubscribedEventHandler MqttMsgSubscribed;
  128. // event for unsubscribed topic
  129. public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed;
  130. #if BROKER
  131. // event for SUBSCRIBE message received
  132. public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived;
  133. // event for USUBSCRIBE message received
  134. public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived;
  135. // event for CONNECT message received
  136. public event MqttMsgConnectEventHandler MqttMsgConnected;
  137. // event for DISCONNECT message received
  138. public event MqttMsgDisconnectEventHandler MqttMsgDisconnected;
  139. #endif
  140. // event for peer/client disconnection
  141. public event ConnectionClosedEventHandler ConnectionClosed;
  142. // channel to communicate over the network
  143. private IMqttNetworkChannel channel;
  144. // inflight messages queue
  145. private Queue inflightQueue;
  146. // internal queue for received messages about inflight messages
  147. private Queue internalQueue;
  148. // internal queue for dispatching events
  149. private Queue eventQueue;
  150. // session
  151. private MqttClientSession session;
  152. // reference to avoid access to singleton via property
  153. private MqttSettings settings;
  154. // current message identifier generated
  155. private ushort messageIdCounter = 0;
  156. // connection is closing due to peer
  157. private bool isConnectionClosing;
  158. /// <summary>
  159. /// Connection status between client and broker
  160. /// </summary>
  161. public bool IsConnected { get; private set; }
  162. /// <summary>
  163. /// Client identifier
  164. /// </summary>
  165. public string ClientId { get; private set; }
  166. /// <summary>
  167. /// Clean session flag
  168. /// </summary>
  169. public bool CleanSession { get; private set; }
  170. /// <summary>
  171. /// Will flag
  172. /// </summary>
  173. public bool WillFlag { get; private set; }
  174. /// <summary>
  175. /// Will QOS level
  176. /// </summary>
  177. public byte WillQosLevel { get; private set; }
  178. /// <summary>
  179. /// Will topic
  180. /// </summary>
  181. public string WillTopic { get; private set; }
  182. /// <summary>
  183. /// Will message
  184. /// </summary>
  185. public string WillMessage { get; private set; }
  186. /// <summary>
  187. /// MQTT protocol version
  188. /// </summary>
  189. public MqttProtocolVersion ProtocolVersion { get; set; }
  190. #if BROKER
  191. /// <summary>
  192. /// MQTT Client Session
  193. /// </summary>
  194. public MqttClientSession Session
  195. {
  196. get { return this.session; }
  197. set { this.session = value; }
  198. }
  199. #endif
  200. /// <summary>
  201. /// MQTT client settings
  202. /// </summary>
  203. public MqttSettings Settings
  204. {
  205. get { return this.settings; }
  206. }
  207. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  208. /// <summary>
  209. /// Constructor
  210. /// </summary>
  211. /// <param name="brokerIpAddress">Broker IP address</param>
  212. [Obsolete("Use this ctor MqttClient(string brokerHostName) insted")]
  213. public MqttClient(IPAddress brokerIpAddress) :
  214. this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, MqttSslProtocols.None)
  215. {
  216. }
  217. /// <summary>
  218. /// Constructor
  219. /// </summary>
  220. /// <param name="brokerIpAddress">Broker IP address</param>
  221. /// <param name="brokerPort">Broker port</param>
  222. /// <param name="secure">Using secure connection</param>
  223. /// <param name="caCert">CA certificate for secure connection</param>
  224. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  225. [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")]
  226. public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol)
  227. {
  228. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  229. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, sslProtocol, null, null);
  230. #else
  231. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, sslProtocol);
  232. #endif
  233. }
  234. #endif
  235. /// <summary>
  236. /// Constructor
  237. /// </summary>
  238. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  239. public MqttClient(string brokerHostName) :
  240. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  241. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, MqttSslProtocols.None)
  242. #else
  243. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, MqttSslProtocols.None)
  244. #endif
  245. {
  246. }
  247. /// <summary>
  248. /// Constructor
  249. /// </summary>
  250. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  251. /// <param name="brokerPort">Broker port</param>
  252. /// <param name="secure">Using secure connection</param>
  253. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  254. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  255. /// <param name="caCert">CA certificate for secure connection</param>
  256. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol)
  257. #else
  258. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  259. #endif
  260. {
  261. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  262. this.Init(brokerHostName, brokerPort, secure, caCert, sslProtocol, null, null);
  263. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  264. this.Init(brokerHostName, brokerPort, secure, sslProtocol);
  265. #else
  266. this.Init(brokerHostName, brokerPort, secure, caCert, sslProtocol);
  267. #endif
  268. }
  269. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  270. /// <summary>
  271. /// Constructor
  272. /// </summary>
  273. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  274. /// <param name="brokerPort">Broker port</param>
  275. /// <param name="secure">Using secure connection</param>
  276. /// <param name="caCert">CA certificate for secure connection</param>
  277. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  278. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  279. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol,
  280. RemoteCertificateValidationCallback userCertificateValidationCallback)
  281. : this(brokerHostName, brokerPort, secure, caCert, sslProtocol, userCertificateValidationCallback, null)
  282. {
  283. }
  284. /// <summary>
  285. /// Constructor
  286. /// </summary>
  287. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  288. /// <param name="brokerPort">Broker port</param>
  289. /// <param name="secure">Using secure connection</param>
  290. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  291. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  292. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  293. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol,
  294. RemoteCertificateValidationCallback userCertificateValidationCallback,
  295. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  296. : this(brokerHostName, brokerPort, secure, null, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback)
  297. {
  298. }
  299. /// <summary>
  300. /// Constructor
  301. /// </summary>
  302. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  303. /// <param name="brokerPort">Broker port</param>
  304. /// <param name="secure">Using secure connection</param>
  305. /// <param name="caCert">CA certificate for secure connection</param>
  306. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  307. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  308. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  309. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol,
  310. RemoteCertificateValidationCallback userCertificateValidationCallback,
  311. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  312. {
  313. this.Init(brokerHostName, brokerPort, secure, caCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  314. }
  315. #endif
  316. #if BROKER
  317. /// <summary>
  318. /// Constructor
  319. /// </summary>
  320. /// <param name="channel">Network channel for communication</param>
  321. public MqttClient(IMqttNetworkChannel channel)
  322. {
  323. // set default MQTT protocol version (default is 3.1.1)
  324. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  325. this.channel = channel;
  326. // reference to MQTT settings
  327. this.settings = MqttSettings.Instance;
  328. // client not connected yet (CONNACK not send from client), some default values
  329. this.IsConnected = false;
  330. this.ClientId = null;
  331. this.CleanSession = true;
  332. this.keepAliveEvent = new AutoResetEvent(false);
  333. // queue for handling inflight messages (publishing and acknowledge)
  334. this.inflightWaitHandle = new AutoResetEvent(false);
  335. this.inflightQueue = new Queue();
  336. // queue for received message
  337. this.receiveEventWaitHandle = new AutoResetEvent(false);
  338. this.eventQueue = new Queue();
  339. this.internalQueue = new Queue();
  340. // session
  341. this.session = null;
  342. }
  343. #endif
  344. /// <summary>
  345. /// MqttClient initialization
  346. /// </summary>
  347. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  348. /// <param name="brokerPort">Broker port</param>
  349. /// <param name="secure">>Using secure connection</param>
  350. /// <param name="caCert">CA certificate for secure connection</param>
  351. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  352. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  353. /// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  354. /// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  355. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol,
  356. RemoteCertificateValidationCallback userCertificateValidationCallback,
  357. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  358. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  359. private void Init(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  360. #else
  361. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, MqttSslProtocols sslProtocol)
  362. #endif
  363. {
  364. // set default MQTT protocol version (default is 3.1.1)
  365. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  366. #if !SSL
  367. // check security parameters
  368. if (secure)
  369. throw new ArgumentException("Library compiled without SSL support");
  370. #endif
  371. this.brokerHostName = brokerHostName;
  372. this.brokerPort = brokerPort;
  373. // reference to MQTT settings
  374. this.settings = MqttSettings.Instance;
  375. // set settings port based on secure connection or not
  376. if (!secure)
  377. this.settings.Port = this.brokerPort;
  378. else
  379. this.settings.SslPort = this.brokerPort;
  380. this.syncEndReceiving = new AutoResetEvent(false);
  381. this.keepAliveEvent = new AutoResetEvent(false);
  382. // queue for handling inflight messages (publishing and acknowledge)
  383. this.inflightWaitHandle = new AutoResetEvent(false);
  384. this.inflightQueue = new Queue();
  385. // queue for received message
  386. this.receiveEventWaitHandle = new AutoResetEvent(false);
  387. this.eventQueue = new Queue();
  388. this.internalQueue = new Queue();
  389. // session
  390. this.session = null;
  391. // create network channel
  392. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  393. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  394. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  395. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, sslProtocol);
  396. #else
  397. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, sslProtocol);
  398. #endif
  399. }
  400. /// <summary>
  401. /// Connect to broker
  402. /// </summary>
  403. /// <param name="clientId">Client identifier</param>
  404. /// <returns>Return code of CONNACK message from broker</returns>
  405. public byte Connect(string clientId)
  406. {
  407. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  408. }
  409. /// <summary>
  410. /// Connect to broker
  411. /// </summary>
  412. /// <param name="clientId">Client identifier</param>
  413. /// <param name="username">Username</param>
  414. /// <param name="password">Password</param>
  415. /// <returns>Return code of CONNACK message from broker</returns>
  416. public byte Connect(string clientId,
  417. string username,
  418. string password)
  419. {
  420. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  421. }
  422. /// <summary>
  423. /// Connect to broker
  424. /// </summary>
  425. /// <param name="clientId">Client identifier</param>
  426. /// <param name="username">Username</param>
  427. /// <param name="password">Password</param>
  428. /// <param name="cleanSession">Clean sessione flag</param>
  429. /// <param name="keepAlivePeriod">Keep alive period</param>
  430. /// <returns>Return code of CONNACK message from broker</returns>
  431. public byte Connect(string clientId,
  432. string username,
  433. string password,
  434. bool cleanSession,
  435. ushort keepAlivePeriod)
  436. {
  437. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod);
  438. }
  439. /// <summary>
  440. /// Connect to broker
  441. /// </summary>
  442. /// <param name="clientId">Client identifier</param>
  443. /// <param name="username">Username</param>
  444. /// <param name="password">Password</param>
  445. /// <param name="willRetain">Will retain flag</param>
  446. /// <param name="willQosLevel">Will QOS level</param>
  447. /// <param name="willFlag">Will flag</param>
  448. /// <param name="willTopic">Will topic</param>
  449. /// <param name="willMessage">Will message</param>
  450. /// <param name="cleanSession">Clean sessione flag</param>
  451. /// <param name="keepAlivePeriod">Keep alive period</param>
  452. /// <returns>Return code of CONNACK message from broker</returns>
  453. public byte Connect(string clientId,
  454. string username,
  455. string password,
  456. bool willRetain,
  457. byte willQosLevel,
  458. bool willFlag,
  459. string willTopic,
  460. string willMessage,
  461. bool cleanSession,
  462. ushort keepAlivePeriod)
  463. {
  464. // create CONNECT message
  465. MqttMsgConnect connect = new MqttMsgConnect(clientId,
  466. username,
  467. password,
  468. willRetain,
  469. willQosLevel,
  470. willFlag,
  471. willTopic,
  472. willMessage,
  473. cleanSession,
  474. keepAlivePeriod,
  475. (byte)this.ProtocolVersion);
  476. try
  477. {
  478. // connect to the broker
  479. this.channel.Connect();
  480. }
  481. catch (Exception ex)
  482. {
  483. throw new MqttConnectionException("Exception connecting to the broker", ex);
  484. }
  485. this.lastCommTime = 0;
  486. this.isRunning = true;
  487. this.isConnectionClosing = false;
  488. // start thread for receiving messages from broker
  489. Fx.StartThread(this.ReceiveThread);
  490. MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
  491. // if connection accepted, start keep alive timer and
  492. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  493. {
  494. // set all client properties
  495. this.ClientId = clientId;
  496. this.CleanSession = cleanSession;
  497. this.WillFlag = willFlag;
  498. this.WillTopic = willTopic;
  499. this.WillMessage = willMessage;
  500. this.WillQosLevel = willQosLevel;
  501. this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms
  502. // restore previous session
  503. this.RestoreSession();
  504. // keep alive period equals zero means turning off keep alive mechanism
  505. if (this.keepAlivePeriod != 0)
  506. {
  507. // start thread for sending keep alive message to the broker
  508. Fx.StartThread(this.KeepAliveThread);
  509. }
  510. // start thread for raising received message event from broker
  511. Fx.StartThread(this.DispatchEventThread);
  512. // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
  513. Fx.StartThread(this.ProcessInflightThread);
  514. this.IsConnected = true;
  515. }
  516. return connack.ReturnCode;
  517. }
  518. /// <summary>
  519. /// Disconnect from broker
  520. /// </summary>
  521. public void Disconnect()
  522. {
  523. MqttMsgDisconnect disconnect = new MqttMsgDisconnect();
  524. this.Send(disconnect);
  525. // close client
  526. this.OnConnectionClosing();
  527. }
  528. #if BROKER
  529. /// <summary>
  530. /// Open client communication
  531. /// </summary>
  532. public void Open()
  533. {
  534. this.isRunning = true;
  535. // start thread for receiving messages from client
  536. Fx.StartThread(this.ReceiveThread);
  537. // start thread for raising received message event from client
  538. Fx.StartThread(this.DispatchEventThread);
  539. // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
  540. Fx.StartThread(this.ProcessInflightThread);
  541. }
  542. #endif
  543. /// <summary>
  544. /// Close client
  545. /// </summary>
  546. #if BROKER
  547. public void Close()
  548. #else
  549. private void Close()
  550. #endif
  551. {
  552. // stop receiving thread
  553. this.isRunning = false;
  554. // wait end receive event thread
  555. if (this.receiveEventWaitHandle != null)
  556. this.receiveEventWaitHandle.Set();
  557. // wait end process inflight thread
  558. if (this.inflightWaitHandle != null)
  559. this.inflightWaitHandle.Set();
  560. #if BROKER
  561. // unlock keep alive thread
  562. this.keepAliveEvent.Set();
  563. #else
  564. // unlock keep alive thread and wait
  565. this.keepAliveEvent.Set();
  566. if (this.keepAliveEventEnd != null)
  567. this.keepAliveEventEnd.WaitOne();
  568. #endif
  569. // clear all queues
  570. this.inflightQueue.Clear();
  571. this.internalQueue.Clear();
  572. this.eventQueue.Clear();
  573. // close network channel
  574. this.channel.Close();
  575. this.IsConnected = false;
  576. }
  577. /// <summary>
  578. /// Execute ping to broker for keep alive
  579. /// </summary>
  580. /// <returns>PINGRESP message from broker</returns>
  581. private MqttMsgPingResp Ping()
  582. {
  583. MqttMsgPingReq pingreq = new MqttMsgPingReq();
  584. try
  585. {
  586. // broker must send PINGRESP within timeout equal to keep alive period
  587. return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
  588. }
  589. catch (Exception e)
  590. {
  591. #if TRACE
  592. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  593. #endif
  594. // client must close connection
  595. this.OnConnectionClosing();
  596. return null;
  597. }
  598. }
  599. #if BROKER
  600. /// <summary>
  601. /// Send CONNACK message to the client (connection accepted or not)
  602. /// </summary>
  603. /// <param name="connect">CONNECT message with all client information</param>
  604. /// <param name="returnCode">Return code for CONNACK message</param>
  605. /// <param name="clientId">If not null, client id assigned by broker</param>
  606. /// <param name="sessionPresent">Session present on the broker</param>
  607. public void Connack(MqttMsgConnect connect, byte returnCode, string clientId, bool sessionPresent)
  608. {
  609. this.lastCommTime = 0;
  610. // create CONNACK message and ...
  611. MqttMsgConnack connack = new MqttMsgConnack();
  612. connack.ReturnCode = returnCode;
  613. // [v3.1.1] session present flag
  614. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1_1)
  615. connack.SessionPresent = sessionPresent;
  616. // ... send it to the client
  617. this.Send(connack);
  618. // connection accepted, start keep alive thread checking
  619. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  620. {
  621. // [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length
  622. // and broker assigned a unique identifier to the client
  623. this.ClientId = (clientId == null) ? connect.ClientId : clientId;
  624. this.CleanSession = connect.CleanSession;
  625. this.WillFlag = connect.WillFlag;
  626. this.WillTopic = connect.WillTopic;
  627. this.WillMessage = connect.WillMessage;
  628. this.WillQosLevel = connect.WillQosLevel;
  629. this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms
  630. // broker has a tolerance of 1.5 specified keep alive period
  631. this.keepAlivePeriod += (this.keepAlivePeriod / 2);
  632. // start thread for checking keep alive period timeout
  633. Fx.StartThread(this.KeepAliveThread);
  634. this.isConnectionClosing = false;
  635. this.IsConnected = true;
  636. }
  637. // connection refused, close TCP/IP channel
  638. else
  639. {
  640. this.Close();
  641. }
  642. }
  643. /// <summary>
  644. /// Send SUBACK message to the client
  645. /// </summary>
  646. /// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
  647. /// <param name="grantedQosLevels">Granted QoS Levels</param>
  648. public void Suback(ushort messageId, byte[] grantedQosLevels)
  649. {
  650. MqttMsgSuback suback = new MqttMsgSuback();
  651. suback.MessageId = messageId;
  652. suback.GrantedQoSLevels = grantedQosLevels;
  653. this.Send(suback);
  654. }
  655. /// <summary>
  656. /// Send UNSUBACK message to the client
  657. /// </summary>
  658. /// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
  659. public void Unsuback(ushort messageId)
  660. {
  661. MqttMsgUnsuback unsuback = new MqttMsgUnsuback();
  662. unsuback.MessageId = messageId;
  663. this.Send(unsuback);
  664. }
  665. #endif
  666. /// <summary>
  667. /// Subscribe for message topics
  668. /// </summary>
  669. /// <param name="topics">List of topics to subscribe</param>
  670. /// <param name="qosLevels">QOS levels related to topics</param>
  671. /// <returns>Message Id related to SUBSCRIBE message</returns>
  672. public ushort Subscribe(string[] topics, byte[] qosLevels)
  673. {
  674. MqttMsgSubscribe subscribe =
  675. new MqttMsgSubscribe(topics, qosLevels);
  676. subscribe.MessageId = this.GetMessageId();
  677. // enqueue subscribe request into the inflight queue
  678. this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish);
  679. return subscribe.MessageId;
  680. }
  681. /// <summary>
  682. /// Unsubscribe for message topics
  683. /// </summary>
  684. /// <param name="topics">List of topics to unsubscribe</param>
  685. /// <returns>Message Id in UNSUBACK message from broker</returns>
  686. public ushort Unsubscribe(string[] topics)
  687. {
  688. MqttMsgUnsubscribe unsubscribe =
  689. new MqttMsgUnsubscribe(topics);
  690. unsubscribe.MessageId = this.GetMessageId();
  691. // enqueue unsubscribe request into the inflight queue
  692. this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish);
  693. return unsubscribe.MessageId;
  694. }
  695. /// <summary>
  696. /// Publish a message asynchronously (QoS Level 0 and not retained)
  697. /// </summary>
  698. /// <param name="topic">Message topic</param>
  699. /// <param name="message">Message data (payload)</param>
  700. /// <returns>Message Id related to PUBLISH message</returns>
  701. public ushort Publish(string topic, byte[] message)
  702. {
  703. return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
  704. }
  705. /// <summary>
  706. /// Publish a message asynchronously
  707. /// </summary>
  708. /// <param name="topic">Message topic</param>
  709. /// <param name="message">Message data (payload)</param>
  710. /// <param name="qosLevel">QoS Level</param>
  711. /// <param name="retain">Retain flag</param>
  712. /// <returns>Message Id related to PUBLISH message</returns>
  713. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  714. {
  715. MqttMsgPublish publish =
  716. new MqttMsgPublish(topic, message, false, qosLevel, retain);
  717. publish.MessageId = this.GetMessageId();
  718. // enqueue message to publish into the inflight queue
  719. bool enqueue = this.EnqueueInflight(publish, MqttMsgFlow.ToPublish);
  720. // message enqueued
  721. if (enqueue)
  722. return publish.MessageId;
  723. // infligh queue full, message not enqueued
  724. else
  725. throw new MqttClientException(MqttClientErrorCode.InflightQueueFull);
  726. }
  727. /// <summary>
  728. /// Wrapper method for raising events
  729. /// </summary>
  730. /// <param name="internalEvent">Internal event</param>
  731. private void OnInternalEvent(InternalEvent internalEvent)
  732. {
  733. lock (this.eventQueue)
  734. {
  735. this.eventQueue.Enqueue(internalEvent);
  736. }
  737. this.receiveEventWaitHandle.Set();
  738. }
  739. /// <summary>
  740. /// Wrapper method for raising closing connection event
  741. /// </summary>
  742. private void OnConnectionClosing()
  743. {
  744. if (!this.isConnectionClosing)
  745. {
  746. this.isConnectionClosing = true;
  747. this.receiveEventWaitHandle.Set();
  748. }
  749. }
  750. /// <summary>
  751. /// Wrapper method for raising PUBLISH message received event
  752. /// </summary>
  753. /// <param name="publish">PUBLISH message received</param>
  754. private void OnMqttMsgPublishReceived(MqttMsgPublish publish)
  755. {
  756. if (this.MqttMsgPublishReceived != null)
  757. {
  758. this.MqttMsgPublishReceived(this,
  759. new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain));
  760. }
  761. }
  762. /// <summary>
  763. /// Wrapper method for raising published message event
  764. /// </summary>
  765. /// <param name="messageId">Message identifier for published message</param>
  766. /// <param name="isPublished">Publish flag</param>
  767. private void OnMqttMsgPublished(ushort messageId, bool isPublished)
  768. {
  769. if (this.MqttMsgPublished != null)
  770. {
  771. this.MqttMsgPublished(this,
  772. new MqttMsgPublishedEventArgs(messageId, isPublished));
  773. }
  774. }
  775. /// <summary>
  776. /// Wrapper method for raising subscribed topic event
  777. /// </summary>
  778. /// <param name="suback">SUBACK message received</param>
  779. private void OnMqttMsgSubscribed(MqttMsgSuback suback)
  780. {
  781. if (this.MqttMsgSubscribed != null)
  782. {
  783. this.MqttMsgSubscribed(this,
  784. new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels));
  785. }
  786. }
  787. /// <summary>
  788. /// Wrapper method for raising unsubscribed topic event
  789. /// </summary>
  790. /// <param name="messageId">Message identifier for unsubscribed topic</param>
  791. private void OnMqttMsgUnsubscribed(ushort messageId)
  792. {
  793. if (this.MqttMsgUnsubscribed != null)
  794. {
  795. this.MqttMsgUnsubscribed(this,
  796. new MqttMsgUnsubscribedEventArgs(messageId));
  797. }
  798. }
  799. #if BROKER
  800. /// <summary>
  801. /// Wrapper method for raising SUBSCRIBE message event
  802. /// </summary>
  803. /// <param name="messageId">Message identifier for subscribe topics request</param>
  804. /// <param name="topics">Topics requested to subscribe</param>
  805. /// <param name="qosLevels">List of QOS Levels requested</param>
  806. private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels)
  807. {
  808. if (this.MqttMsgSubscribeReceived != null)
  809. {
  810. this.MqttMsgSubscribeReceived(this,
  811. new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels));
  812. }
  813. }
  814. /// <summary>
  815. /// Wrapper method for raising UNSUBSCRIBE message event
  816. /// </summary>
  817. /// <param name="messageId">Message identifier for unsubscribe topics request</param>
  818. /// <param name="topics">Topics requested to unsubscribe</param>
  819. private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics)
  820. {
  821. if (this.MqttMsgUnsubscribeReceived != null)
  822. {
  823. this.MqttMsgUnsubscribeReceived(this,
  824. new MqttMsgUnsubscribeEventArgs(messageId, topics));
  825. }
  826. }
  827. /// <summary>
  828. /// Wrapper method for raising CONNECT message event
  829. /// </summary>
  830. private void OnMqttMsgConnected(MqttMsgConnect connect)
  831. {
  832. if (this.MqttMsgConnected != null)
  833. {
  834. this.ProtocolVersion = (MqttProtocolVersion)connect.ProtocolVersion;
  835. this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect));
  836. }
  837. }
  838. /// <summary>
  839. /// Wrapper method for raising DISCONNECT message event
  840. /// </summary>
  841. private void OnMqttMsgDisconnected()
  842. {
  843. if (this.MqttMsgDisconnected != null)
  844. {
  845. this.MqttMsgDisconnected(this, EventArgs.Empty);
  846. }
  847. }
  848. #endif
  849. /// <summary>
  850. /// Wrapper method for peer/client disconnection
  851. /// </summary>
  852. private void OnConnectionClosed()
  853. {
  854. if (this.ConnectionClosed != null)
  855. {
  856. this.ConnectionClosed(this, EventArgs.Empty);
  857. }
  858. }
  859. /// <summary>
  860. /// Send a message
  861. /// </summary>
  862. /// <param name="msgBytes">Message bytes</param>
  863. private void Send(byte[] msgBytes)
  864. {
  865. try
  866. {
  867. // send message
  868. this.channel.Send(msgBytes);
  869. #if !BROKER
  870. // update last message sent ticks
  871. this.lastCommTime = Environment.TickCount;
  872. #endif
  873. }
  874. catch (Exception e)
  875. {
  876. #if TRACE
  877. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  878. #endif
  879. throw new MqttCommunicationException(e);
  880. }
  881. }
  882. /// <summary>
  883. /// Send a message
  884. /// </summary>
  885. /// <param name="msg">Message</param>
  886. private void Send(MqttMsgBase msg)
  887. {
  888. #if TRACE
  889. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  890. #endif
  891. this.Send(msg.GetBytes((byte)this.ProtocolVersion));
  892. }
  893. /// <summary>
  894. /// Send a message to the broker and wait answer
  895. /// </summary>
  896. /// <param name="msgBytes">Message bytes</param>
  897. /// <returns>MQTT message response</returns>
  898. private MqttMsgBase SendReceive(byte[] msgBytes)
  899. {
  900. return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  901. }
  902. /// <summary>
  903. /// Send a message to the broker and wait answer
  904. /// </summary>
  905. /// <param name="msgBytes">Message bytes</param>
  906. /// <param name="timeout">Timeout for receiving answer</param>
  907. /// <returns>MQTT message response</returns>
  908. private MqttMsgBase SendReceive(byte[] msgBytes, int timeout)
  909. {
  910. // reset handle before sending
  911. this.syncEndReceiving.Reset();
  912. try
  913. {
  914. // send message
  915. this.channel.Send(msgBytes);
  916. // update last message sent ticks
  917. this.lastCommTime = Environment.TickCount;
  918. }
  919. catch (Exception e)
  920. {
  921. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  922. if (typeof(SocketException) == e.GetType())
  923. {
  924. // connection reset by broker
  925. if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset)
  926. this.IsConnected = false;
  927. }
  928. #endif
  929. #if TRACE
  930. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  931. #endif
  932. throw new MqttCommunicationException(e);
  933. }
  934. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  935. // wait for answer from broker
  936. if (this.syncEndReceiving.WaitOne(timeout, false))
  937. #else
  938. // wait for answer from broker
  939. if (this.syncEndReceiving.WaitOne(timeout))
  940. #endif
  941. {
  942. // message received without exception
  943. if (this.exReceiving == null)
  944. return this.msgReceived;
  945. // receiving thread catched exception
  946. else
  947. throw this.exReceiving;
  948. }
  949. else
  950. {
  951. // throw timeout exception
  952. throw new MqttCommunicationException();
  953. }
  954. }
  955. /// <summary>
  956. /// Send a message to the broker and wait answer
  957. /// </summary>
  958. /// <param name="msg">Message</param>
  959. /// <returns>MQTT message response</returns>
  960. private MqttMsgBase SendReceive(MqttMsgBase msg)
  961. {
  962. return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  963. }
  964. /// <summary>
  965. /// Send a message to the broker and wait answer
  966. /// </summary>
  967. /// <param name="msg">Message</param>
  968. /// <param name="timeout">Timeout for receiving answer</param>
  969. /// <returns>MQTT message response</returns>
  970. private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout)
  971. {
  972. #if TRACE
  973. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  974. #endif
  975. return this.SendReceive(msg.GetBytes((byte)this.ProtocolVersion), timeout);
  976. }
  977. /// <summary>
  978. /// Enqueue a message into the inflight queue
  979. /// </summary>
  980. /// <param name="msg">Message to enqueue</param>
  981. /// <param name="flow">Message flow (publish, acknowledge)</param>
  982. /// <returns>Message enqueued or not</returns>
  983. private bool EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow)
  984. {
  985. // enqueue is needed (or not)
  986. bool enqueue = true;
  987. // if it is a PUBLISH message with QoS Level 2
  988. if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  989. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  990. {
  991. lock (this.inflightQueue)
  992. {
  993. // if it is a PUBLISH message already received (it is in the inflight queue), the publisher
  994. // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
  995. // NOTE : I need to find on message id and flow because the broker could be publish/received
  996. // to/from client and message id could be the same (one tracked by broker and the other by client)
  997. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  998. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  999. // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
  1000. // to change state to re-send PUBREC
  1001. if (msgCtx != null)
  1002. {
  1003. msgCtx.State = MqttMsgState.QueuedQos2;
  1004. msgCtx.Flow = MqttMsgFlow.ToAcknowledge;
  1005. enqueue = false;
  1006. }
  1007. }
  1008. }
  1009. if (enqueue)
  1010. {
  1011. // set a default state
  1012. MqttMsgState state = MqttMsgState.QueuedQos0;
  1013. // based on QoS level, the messages flow between broker and client changes
  1014. switch (msg.QosLevel)
  1015. {
  1016. // QoS Level 0
  1017. case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE:
  1018. state = MqttMsgState.QueuedQos0;
  1019. break;
  1020. // QoS Level 1
  1021. case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE:
  1022. state = MqttMsgState.QueuedQos1;
  1023. break;
  1024. // QoS Level 2
  1025. case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE:
  1026. state = MqttMsgState.QueuedQos2;
  1027. break;
  1028. }
  1029. // [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1
  1030. // so QueuedQos1 state isn't valid for them
  1031. if (msg.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1032. state = MqttMsgState.SendSubscribe;
  1033. else if (msg.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1034. state = MqttMsgState.SendUnsubscribe;
  1035. // queue message context
  1036. MqttMsgContext msgContext = new MqttMsgContext()
  1037. {
  1038. Message = msg,
  1039. State = state,
  1040. Flow = flow,
  1041. Attempt = 0
  1042. };
  1043. lock (this.inflightQueue)
  1044. {
  1045. // check number of messages inside inflight queue
  1046. enqueue = (this.inflightQueue.Count < this.settings.InflightQueueSize);
  1047. if (enqueue)
  1048. {
  1049. // enqueue message and unlock send thread
  1050. this.inflightQueue.Enqueue(msgContext);
  1051. #if TRACE
  1052. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1053. #endif
  1054. // PUBLISH message
  1055. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1056. {
  1057. // to publish and QoS level 1 or 2
  1058. if ((msgContext.Flow == MqttMsgFlow.ToPublish) &&
  1059. ((msg.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) ||
  1060. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)))
  1061. {
  1062. if (this.session != null)
  1063. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1064. }
  1065. // to acknowledge and QoS level 2
  1066. else if ((msgContext.Flow == MqttMsgFlow.ToAcknowledge) &&
  1067. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1068. {
  1069. if (this.session != null)
  1070. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1071. }
  1072. }
  1073. }
  1074. }
  1075. }
  1076. this.inflightWaitHandle.Set();
  1077. return enqueue;
  1078. }
  1079. /// <summary>
  1080. /// Enqueue a message into the internal queue
  1081. /// </summary>
  1082. /// <param name="msg">Message to enqueue</param>
  1083. private void EnqueueInternal(MqttMsgBase msg)
  1084. {
  1085. // enqueue is needed (or not)
  1086. bool enqueue = true;
  1087. // if it is a PUBREL message (for QoS Level 2)
  1088. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)
  1089. {
  1090. lock (this.inflightQueue)
  1091. {
  1092. // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
  1093. // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
  1094. // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
  1095. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1096. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1097. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1098. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1099. // the PUBLISH message isn't in the inflight queue, it was already processed so
  1100. // we need to re-send PUBCOMP only
  1101. if (msgCtx == null)
  1102. {
  1103. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1104. pubcomp.MessageId = msg.MessageId;
  1105. this.Send(pubcomp);
  1106. enqueue = false;
  1107. }
  1108. }
  1109. }
  1110. // if it is a PUBCOMP message (for QoS Level 2)
  1111. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE)
  1112. {
  1113. lock (this.inflightQueue)
  1114. {
  1115. // if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue,
  1116. // it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP
  1117. // but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP.
  1118. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1119. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1120. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1121. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1122. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP
  1123. if (msgCtx == null)
  1124. {
  1125. enqueue = false;
  1126. }
  1127. }
  1128. }
  1129. // if it is a PUBREC message (for QoS Level 2)
  1130. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)
  1131. {
  1132. lock (this.inflightQueue)
  1133. {
  1134. // if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue,
  1135. // it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time
  1136. // the publish is failed and we need only to ignore this PUBREC.
  1137. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1138. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1139. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1140. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1141. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC
  1142. if (msgCtx == null)
  1143. {
  1144. enqueue = false;
  1145. }
  1146. }
  1147. }
  1148. if (enqueue)
  1149. {
  1150. lock (this.internalQueue)
  1151. {
  1152. this.internalQueue.Enqueue(msg);
  1153. #if TRACE
  1154. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1155. #endif
  1156. this.inflightWaitHandle.Set();
  1157. }
  1158. }
  1159. }
  1160. /// <summary>
  1161. /// Thread for receiving messages
  1162. /// </summary>
  1163. private void ReceiveThread()
  1164. {
  1165. int readBytes = 0;
  1166. byte[] fixedHeaderFirstByte = new byte[1];
  1167. byte msgType;
  1168. while (this.isRunning)
  1169. {
  1170. try
  1171. {
  1172. // read first byte (fixed header)
  1173. readBytes = this.channel.Receive(fixedHeaderFirstByte);
  1174. if (readBytes > 0)
  1175. {
  1176. #if BROKER
  1177. // update last message received ticks
  1178. this.lastCommTime = Environment.TickCount;
  1179. #endif
  1180. // extract message type from received byte
  1181. msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET);
  1182. switch (msgType)
  1183. {
  1184. // CONNECT message received
  1185. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1186. #if BROKER
  1187. MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1188. #if TRACE
  1189. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", connect);
  1190. #endif
  1191. // raise message received event
  1192. this.OnInternalEvent(new MsgInternalEvent(connect));
  1193. break;
  1194. #else
  1195. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1196. #endif
  1197. // CONNACK message received
  1198. case MqttMsgBase.MQTT_MSG_CONNACK_TYPE:
  1199. #if BROKER
  1200. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1201. #else
  1202. this.msgReceived = MqttMsgConnack.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1203. #if TRACE
  1204. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1205. #endif
  1206. this.syncEndReceiving.Set();
  1207. break;
  1208. #endif
  1209. // PINGREQ message received
  1210. case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE:
  1211. #if BROKER
  1212. this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1213. #if TRACE
  1214. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1215. #endif
  1216. MqttMsgPingResp pingresp = new MqttMsgPingResp();
  1217. this.Send(pingresp);
  1218. break;
  1219. #else
  1220. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1221. #endif
  1222. // PINGRESP message received
  1223. case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE:
  1224. #if BROKER
  1225. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1226. #else
  1227. this.msgReceived = MqttMsgPingResp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1228. #if TRACE
  1229. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1230. #endif
  1231. this.syncEndReceiving.Set();
  1232. break;
  1233. #endif
  1234. // SUBSCRIBE message received
  1235. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1236. #if BROKER
  1237. MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1238. #if TRACE
  1239. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", subscribe);
  1240. #endif
  1241. // raise message received event
  1242. this.OnInternalEvent(new MsgInternalEvent(subscribe));
  1243. break;
  1244. #else
  1245. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1246. #endif
  1247. // SUBACK message received
  1248. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1249. #if BROKER
  1250. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1251. #else
  1252. // enqueue SUBACK message received (for QoS Level 1) into the internal queue
  1253. MqttMsgSuback suback = MqttMsgSuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1254. #if TRACE
  1255. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", suback);
  1256. #endif
  1257. // enqueue SUBACK message into the internal queue
  1258. this.EnqueueInternal(suback);
  1259. break;
  1260. #endif
  1261. // PUBLISH message received
  1262. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1263. MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1264. #if TRACE
  1265. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", publish);
  1266. #endif
  1267. // enqueue PUBLISH message to acknowledge into the inflight queue
  1268. this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge);
  1269. break;
  1270. // PUBACK message received
  1271. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1272. // enqueue PUBACK message received (for QoS Level 1) into the internal queue
  1273. MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1274. #if TRACE
  1275. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", puback);
  1276. #endif
  1277. // enqueue PUBACK message into the internal queue
  1278. this.EnqueueInternal(puback);
  1279. break;
  1280. // PUBREC message received
  1281. case MqttMsgBase.MQTT_MSG_PUBREC_TYPE:
  1282. // enqueue PUBREC message received (for QoS Level 2) into the internal queue
  1283. MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1284. #if TRACE
  1285. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrec);
  1286. #endif
  1287. // enqueue PUBREC message into the internal queue
  1288. this.EnqueueInternal(pubrec);
  1289. break;
  1290. // PUBREL message received
  1291. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1292. // enqueue PUBREL message received (for QoS Level 2) into the internal queue
  1293. MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1294. #if TRACE
  1295. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrel);
  1296. #endif
  1297. // enqueue PUBREL message into the internal queue
  1298. this.EnqueueInternal(pubrel);
  1299. break;
  1300. // PUBCOMP message received
  1301. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1302. // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
  1303. MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1304. #if TRACE
  1305. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubcomp);
  1306. #endif
  1307. // enqueue PUBCOMP message into the internal queue
  1308. this.EnqueueInternal(pubcomp);
  1309. break;
  1310. // UNSUBSCRIBE message received
  1311. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1312. #if BROKER
  1313. MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1314. #if TRACE
  1315. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsubscribe);
  1316. #endif
  1317. // raise message received event
  1318. this.OnInternalEvent(new MsgInternalEvent(unsubscribe));
  1319. break;
  1320. #else
  1321. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1322. #endif
  1323. // UNSUBACK message received
  1324. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1325. #if BROKER
  1326. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1327. #else
  1328. // enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
  1329. MqttMsgUnsuback unsuback = MqttMsgUnsuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1330. #if TRACE
  1331. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsuback);
  1332. #endif
  1333. // enqueue UNSUBACK message into the internal queue
  1334. this.EnqueueInternal(unsuback);
  1335. break;
  1336. #endif
  1337. // DISCONNECT message received
  1338. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1339. #if BROKER
  1340. MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1341. #if TRACE
  1342. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", disconnect);
  1343. #endif
  1344. // raise message received event
  1345. this.OnInternalEvent(new MsgInternalEvent(disconnect));
  1346. break;
  1347. #else
  1348. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1349. #endif
  1350. default:
  1351. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1352. }
  1353. this.exReceiving = null;
  1354. }
  1355. // zero bytes read, peer gracefully closed socket
  1356. else
  1357. {
  1358. // wake up thread that will notify connection is closing
  1359. this.OnConnectionClosing();
  1360. }
  1361. }
  1362. catch (Exception e)
  1363. {
  1364. #if TRACE
  1365. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1366. #endif
  1367. this.exReceiving = new MqttCommunicationException(e);
  1368. bool close = false;
  1369. if (e.GetType() == typeof(MqttClientException))
  1370. {
  1371. // [v3.1.1] scenarios the receiver MUST close the network connection
  1372. MqttClientException ex = e as MqttClientException;
  1373. close = ((ex.ErrorCode == MqttClientErrorCode.InvalidFlagBits) ||
  1374. (ex.ErrorCode == MqttClientErrorCode.InvalidProtocolName) ||
  1375. (ex.ErrorCode == MqttClientErrorCode.InvalidConnectFlags));
  1376. }
  1377. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  1378. else if ((e.GetType() == typeof(SocketException)) ||
  1379. ((e.InnerException != null) && (e.InnerException.GetType() == typeof(SocketException)))) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException
  1380. {
  1381. close = true;
  1382. }
  1383. #endif
  1384. if (close)
  1385. {
  1386. // wake up thread that will notify connection is closing
  1387. this.OnConnectionClosing();
  1388. }
  1389. }
  1390. }
  1391. }
  1392. /// <summary>
  1393. /// Thread for handling keep alive message
  1394. /// </summary>
  1395. private void KeepAliveThread()
  1396. {
  1397. int delta = 0;
  1398. int wait = this.keepAlivePeriod;
  1399. // create event to signal that current thread is end
  1400. this.keepAliveEventEnd = new AutoResetEvent(false);
  1401. while (this.isRunning)
  1402. {
  1403. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1404. // waiting...
  1405. this.keepAliveEvent.WaitOne(wait, false);
  1406. #else
  1407. // waiting...
  1408. this.keepAliveEvent.WaitOne(wait);
  1409. #endif
  1410. if (this.isRunning)
  1411. {
  1412. delta = Environment.TickCount - this.lastCommTime;
  1413. // if timeout exceeded ...
  1414. if (delta >= this.keepAlivePeriod)
  1415. {
  1416. #if BROKER
  1417. // client must close connection
  1418. this.OnConnectionClosing();
  1419. #else
  1420. // ... send keep alive
  1421. this.Ping();
  1422. wait = this.keepAlivePeriod;
  1423. #endif
  1424. }
  1425. else
  1426. {
  1427. // update waiting time
  1428. wait = this.keepAlivePeriod - delta;
  1429. }
  1430. }
  1431. }
  1432. // signal thread end
  1433. this.keepAliveEventEnd.Set();
  1434. }
  1435. /// <summary>
  1436. /// Thread for raising event
  1437. /// </summary>
  1438. private void DispatchEventThread()
  1439. {
  1440. while (this.isRunning)
  1441. {
  1442. #if BROKER
  1443. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1444. {
  1445. // broker need to receive the first message (CONNECT)
  1446. // within a reasonable amount of time after TCP/IP connection
  1447. if (!this.IsConnected)
  1448. {
  1449. // wait on receiving message from client with a connection timeout
  1450. if (!this.receiveEventWaitHandle.WaitOne(this.settings.TimeoutOnConnection))
  1451. {
  1452. // client must close connection
  1453. this.Close();
  1454. // client raw disconnection
  1455. this.OnConnectionClosed();
  1456. }
  1457. }
  1458. else
  1459. {
  1460. // wait on receiving message from client
  1461. this.receiveEventWaitHandle.WaitOne();
  1462. }
  1463. }
  1464. #else
  1465. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1466. // wait on receiving message from client
  1467. this.receiveEventWaitHandle.WaitOne();
  1468. #endif
  1469. // check if it is running or we are closing client
  1470. if (this.isRunning)
  1471. {
  1472. // get event from queue
  1473. InternalEvent internalEvent = null;
  1474. lock (this.eventQueue)
  1475. {
  1476. if (this.eventQueue.Count > 0)
  1477. internalEvent = (InternalEvent)this.eventQueue.Dequeue();
  1478. }
  1479. // it's an event with a message inside
  1480. if (internalEvent != null)
  1481. {
  1482. MqttMsgBase msg = ((MsgInternalEvent)internalEvent).Message;
  1483. if (msg != null)
  1484. {
  1485. switch (msg.Type)
  1486. {
  1487. // CONNECT message received
  1488. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1489. #if BROKER
  1490. // raise connected client event (CONNECT message received)
  1491. this.OnMqttMsgConnected((MqttMsgConnect)msg);
  1492. break;
  1493. #else
  1494. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1495. #endif
  1496. // SUBSCRIBE message received
  1497. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1498. #if BROKER
  1499. MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg;
  1500. // raise subscribe topic event (SUBSCRIBE message received)
  1501. this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels);
  1502. break;
  1503. #else
  1504. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1505. #endif
  1506. // SUBACK message received
  1507. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1508. // raise subscribed topic event (SUBACK message received)
  1509. this.OnMqttMsgSubscribed((MqttMsgSuback)msg);
  1510. break;
  1511. // PUBLISH message received
  1512. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1513. // PUBLISH message received in a published internal event, no publish succeeded
  1514. if (internalEvent.GetType() == typeof(MsgPublishedInternalEvent))
  1515. this.OnMqttMsgPublished(msg.MessageId, false);
  1516. else
  1517. // raise PUBLISH message received event
  1518. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1519. break;
  1520. // PUBACK message received
  1521. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1522. // raise published message event
  1523. // (PUBACK received for QoS Level 1)
  1524. this.OnMqttMsgPublished(msg.MessageId, true);
  1525. break;
  1526. // PUBREL message received
  1527. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1528. // raise message received event
  1529. // (PUBREL received for QoS Level 2)
  1530. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1531. break;
  1532. // PUBCOMP message received
  1533. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1534. // raise published message event
  1535. // (PUBCOMP received for QoS Level 2)
  1536. this.OnMqttMsgPublished(msg.MessageId, true);
  1537. break;
  1538. // UNSUBSCRIBE message received from client
  1539. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1540. #if BROKER
  1541. MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg;
  1542. // raise unsubscribe topic event (UNSUBSCRIBE message received)
  1543. this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics);
  1544. break;
  1545. #else
  1546. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1547. #endif
  1548. // UNSUBACK message received
  1549. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1550. // raise unsubscribed topic event
  1551. this.OnMqttMsgUnsubscribed(msg.MessageId);
  1552. break;
  1553. // DISCONNECT message received from client
  1554. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1555. #if BROKER
  1556. // raise disconnected client event (DISCONNECT message received)
  1557. this.OnMqttMsgDisconnected();
  1558. break;
  1559. #else
  1560. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1561. #endif
  1562. }
  1563. }
  1564. }
  1565. // all events for received messages dispatched, check if there is closing connection
  1566. if ((this.eventQueue.Count == 0) && this.isConnectionClosing)
  1567. {
  1568. // client must close connection
  1569. this.Close();
  1570. // client raw disconnection
  1571. this.OnConnectionClosed();
  1572. }
  1573. }
  1574. }
  1575. }
  1576. /// <summary>
  1577. /// Process inflight messages queue
  1578. /// </summary>
  1579. private void ProcessInflightThread()
  1580. {
  1581. MqttMsgContext msgContext = null;
  1582. MqttMsgBase msgInflight = null;
  1583. MqttMsgBase msgReceived = null;
  1584. InternalEvent internalEvent = null;
  1585. bool acknowledge = false;
  1586. int timeout = Timeout.Infinite;
  1587. int delta;
  1588. bool msgReceivedProcessed = false;
  1589. try
  1590. {
  1591. while (this.isRunning)
  1592. {
  1593. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1594. // wait on message queueud to inflight
  1595. this.inflightWaitHandle.WaitOne(timeout, false);
  1596. #else
  1597. // wait on message queueud to inflight
  1598. this.inflightWaitHandle.WaitOne(timeout);
  1599. #endif
  1600. // it could be unblocked because Close() method is joining
  1601. if (this.isRunning)
  1602. {
  1603. lock (this.inflightQueue)
  1604. {
  1605. // message received and peeked from internal queue is processed
  1606. // NOTE : it has the corresponding message in inflight queue based on messageId
  1607. // (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...)
  1608. // if it's orphan we need to remove from internal queue
  1609. msgReceivedProcessed = false;
  1610. acknowledge = false;
  1611. msgReceived = null;
  1612. // set timeout tu MaxValue instead of Infinte (-1) to perform
  1613. // compare with calcultad current msgTimeout
  1614. timeout = Int32.MaxValue;
  1615. // a message inflight could be re-enqueued but we have to
  1616. // analyze it only just one time for cycle
  1617. int count = this.inflightQueue.Count;
  1618. // process all inflight queued messages
  1619. while (count > 0)
  1620. {
  1621. count--;
  1622. acknowledge = false;
  1623. msgReceived = null;
  1624. // check to be sure that client isn't closing and all queues are now empty !
  1625. if (!this.isRunning)
  1626. break;
  1627. // dequeue message context from queue
  1628. msgContext = (MqttMsgContext)this.inflightQueue.Dequeue();
  1629. // get inflight message
  1630. msgInflight = (MqttMsgBase)msgContext.Message;
  1631. switch (msgContext.State)
  1632. {
  1633. case MqttMsgState.QueuedQos0:
  1634. // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
  1635. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1636. {
  1637. this.Send(msgInflight);
  1638. }
  1639. // QoS 0, no need acknowledge
  1640. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1641. {
  1642. internalEvent = new MsgInternalEvent(msgInflight);
  1643. // notify published message from broker (no need acknowledged)
  1644. this.OnInternalEvent(internalEvent);
  1645. }
  1646. #if TRACE
  1647. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1648. #endif
  1649. break;
  1650. case MqttMsgState.QueuedQos1:
  1651. // [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1
  1652. case MqttMsgState.SendSubscribe:
  1653. case MqttMsgState.SendUnsubscribe:
  1654. // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
  1655. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1656. {
  1657. msgContext.Timestamp = Environment.TickCount;
  1658. msgContext.Attempt++;
  1659. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1660. {
  1661. // PUBLISH message to send, wait for PUBACK
  1662. msgContext.State = MqttMsgState.WaitForPuback;
  1663. // retry ? set dup flag [v3.1.1] only for PUBLISH message
  1664. if (msgContext.Attempt > 1)
  1665. msgInflight.DupFlag = true;
  1666. }
  1667. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1668. // SUBSCRIBE message to send, wait for SUBACK
  1669. msgContext.State = MqttMsgState.WaitForSuback;
  1670. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1671. // UNSUBSCRIBE message to send, wait for UNSUBACK
  1672. msgContext.State = MqttMsgState.WaitForUnsuback;
  1673. this.Send(msgInflight);
  1674. // update timeout : minimum between delay (based on current message sent) or current timeout
  1675. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1676. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1677. this.inflightQueue.Enqueue(msgContext);
  1678. }
  1679. // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
  1680. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1681. {
  1682. MqttMsgPuback puback = new MqttMsgPuback();
  1683. puback.MessageId = msgInflight.MessageId;
  1684. this.Send(puback);
  1685. internalEvent = new MsgInternalEvent(msgInflight);
  1686. // notify published message from broker and acknowledged
  1687. this.OnInternalEvent(internalEvent);
  1688. #if TRACE
  1689. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1690. #endif
  1691. }
  1692. break;
  1693. case MqttMsgState.QueuedQos2:
  1694. // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
  1695. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1696. {
  1697. msgContext.Timestamp = Environment.TickCount;
  1698. msgContext.Attempt++;
  1699. msgContext.State = MqttMsgState.WaitForPubrec;
  1700. // retry ? set dup flag
  1701. if (msgContext.Attempt > 1)
  1702. msgInflight.DupFlag = true;
  1703. this.Send(msgInflight);
  1704. // update timeout : minimum between delay (based on current message sent) or current timeout
  1705. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1706. // re-enqueue message (I have to re-analyze for receiving PUBREC)
  1707. this.inflightQueue.Enqueue(msgContext);
  1708. }
  1709. // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
  1710. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1711. {
  1712. MqttMsgPubrec pubrec = new MqttMsgPubrec();
  1713. pubrec.MessageId = msgInflight.MessageId;
  1714. msgContext.State = MqttMsgState.WaitForPubrel;
  1715. this.Send(pubrec);
  1716. // re-enqueue message (I have to re-analyze for receiving PUBREL)
  1717. this.inflightQueue.Enqueue(msgContext);
  1718. }
  1719. break;
  1720. case MqttMsgState.WaitForPuback:
  1721. case MqttMsgState.WaitForSuback:
  1722. case MqttMsgState.WaitForUnsuback:
  1723. // QoS 1, waiting for PUBACK of a PUBLISH message sent or
  1724. // waiting for SUBACK of a SUBSCRIBE message sent or
  1725. // waiting for UNSUBACK of a UNSUBSCRIBE message sent or
  1726. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1727. {
  1728. acknowledge = false;
  1729. lock (this.internalQueue)
  1730. {
  1731. if (this.internalQueue.Count > 0)
  1732. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1733. }
  1734. // it is a PUBACK message or a SUBACK/UNSUBACK message
  1735. if (msgReceived != null)
  1736. {
  1737. // PUBACK message or SUBACK/UNSUBACK message for the current message
  1738. if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1739. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1740. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)))
  1741. {
  1742. lock (this.internalQueue)
  1743. {
  1744. // received message processed
  1745. this.internalQueue.Dequeue();
  1746. acknowledge = true;
  1747. msgReceivedProcessed = true;
  1748. #if TRACE
  1749. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1750. #endif
  1751. }
  1752. // if PUBACK received, confirm published with flag
  1753. if (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE)
  1754. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  1755. else
  1756. internalEvent = new MsgInternalEvent(msgReceived);
  1757. // notify received acknowledge from broker of a published message or subscribe/unsubscribe message
  1758. this.OnInternalEvent(internalEvent);
  1759. // PUBACK received for PUBLISH message with QoS Level 1, remove from session state
  1760. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1761. (this.session != null) &&
  1762. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1763. (this.session.InflightMessages.Contains(msgContext.Key)))
  1764. #else
  1765. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1766. #endif
  1767. {
  1768. this.session.InflightMessages.Remove(msgContext.Key);
  1769. }
  1770. #if TRACE
  1771. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1772. #endif
  1773. }
  1774. }
  1775. // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
  1776. if (!acknowledge)
  1777. {
  1778. delta = Environment.TickCount - msgContext.Timestamp;
  1779. // check timeout for receiving PUBACK since PUBLISH was sent or
  1780. // for receiving SUBACK since SUBSCRIBE was sent or
  1781. // for receiving UNSUBACK since UNSUBSCRIBE was sent
  1782. if (delta >= this.settings.DelayOnRetry)
  1783. {
  1784. // max retry not reached, resend
  1785. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1786. {
  1787. msgContext.State = MqttMsgState.QueuedQos1;
  1788. // re-enqueue message
  1789. this.inflightQueue.Enqueue(msgContext);
  1790. // update timeout (0 -> reanalyze queue immediately)
  1791. timeout = 0;
  1792. }
  1793. else
  1794. {
  1795. // if PUBACK for a PUBLISH message not received after retries, raise event for not published
  1796. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1797. {
  1798. // PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1799. if ((this.session != null) &&
  1800. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1801. (this.session.InflightMessages.Contains(msgContext.Key)))
  1802. #else
  1803. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1804. #endif
  1805. {
  1806. this.session.InflightMessages.Remove(msgContext.Key);
  1807. }
  1808. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1809. // notify not received acknowledge from broker and message not published
  1810. this.OnInternalEvent(internalEvent);
  1811. }
  1812. // NOTE : not raise events for SUBACK or UNSUBACK not received
  1813. // for the user no event raised means subscribe/unsubscribe failed
  1814. }
  1815. }
  1816. else
  1817. {
  1818. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1819. this.inflightQueue.Enqueue(msgContext);
  1820. // update timeout
  1821. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1822. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1823. }
  1824. }
  1825. }
  1826. break;
  1827. case MqttMsgState.WaitForPubrec:
  1828. // QoS 2, waiting for PUBREC of a PUBLISH message sent
  1829. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1830. {
  1831. acknowledge = false;
  1832. lock (this.internalQueue)
  1833. {
  1834. if (this.internalQueue.Count > 0)
  1835. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1836. }
  1837. // it is a PUBREC message
  1838. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  1839. {
  1840. // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
  1841. if (msgReceived.MessageId == msgInflight.MessageId)
  1842. {
  1843. lock (this.internalQueue)
  1844. {
  1845. // received message processed
  1846. this.internalQueue.Dequeue();
  1847. acknowledge = true;
  1848. msgReceivedProcessed = true;
  1849. #if TRACE
  1850. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1851. #endif
  1852. }
  1853. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1854. pubrel.MessageId = msgInflight.MessageId;
  1855. msgContext.State = MqttMsgState.WaitForPubcomp;
  1856. msgContext.Timestamp = Environment.TickCount;
  1857. msgContext.Attempt = 1;
  1858. this.Send(pubrel);
  1859. // update timeout : minimum between delay (based on current message sent) or current timeout
  1860. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1861. // re-enqueue message
  1862. this.inflightQueue.Enqueue(msgContext);
  1863. }
  1864. }
  1865. // current message not acknowledged
  1866. if (!acknowledge)
  1867. {
  1868. delta = Environment.TickCount - msgContext.Timestamp;
  1869. // check timeout for receiving PUBREC since PUBLISH was sent
  1870. if (delta >= this.settings.DelayOnRetry)
  1871. {
  1872. // max retry not reached, resend
  1873. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1874. {
  1875. msgContext.State = MqttMsgState.QueuedQos2;
  1876. // re-enqueue message
  1877. this.inflightQueue.Enqueue(msgContext);
  1878. // update timeout (0 -> reanalyze queue immediately)
  1879. timeout = 0;
  1880. }
  1881. else
  1882. {
  1883. // PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1884. if ((this.session != null) &&
  1885. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1886. (this.session.InflightMessages.Contains(msgContext.Key)))
  1887. #else
  1888. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1889. #endif
  1890. {
  1891. this.session.InflightMessages.Remove(msgContext.Key);
  1892. }
  1893. // if PUBREC for a PUBLISH message not received after retries, raise event for not published
  1894. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1895. // notify not received acknowledge from broker and message not published
  1896. this.OnInternalEvent(internalEvent);
  1897. }
  1898. }
  1899. else
  1900. {
  1901. // re-enqueue message
  1902. this.inflightQueue.Enqueue(msgContext);
  1903. // update timeout
  1904. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1905. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1906. }
  1907. }
  1908. }
  1909. break;
  1910. case MqttMsgState.WaitForPubrel:
  1911. // QoS 2, waiting for PUBREL of a PUBREC message sent
  1912. if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1913. {
  1914. lock (this.internalQueue)
  1915. {
  1916. if (this.internalQueue.Count > 0)
  1917. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1918. }
  1919. // it is a PUBREL message
  1920. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE))
  1921. {
  1922. // PUBREL message for the current message, send PUBCOMP
  1923. if (msgReceived.MessageId == msgInflight.MessageId)
  1924. {
  1925. lock (this.internalQueue)
  1926. {
  1927. // received message processed
  1928. this.internalQueue.Dequeue();
  1929. msgReceivedProcessed = true;
  1930. #if TRACE
  1931. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1932. #endif
  1933. }
  1934. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1935. pubcomp.MessageId = msgInflight.MessageId;
  1936. this.Send(pubcomp);
  1937. internalEvent = new MsgInternalEvent(msgInflight);
  1938. // notify published message from broker and acknowledged
  1939. this.OnInternalEvent(internalEvent);
  1940. // PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state
  1941. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1942. (this.session != null) &&
  1943. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1944. (this.session.InflightMessages.Contains(msgContext.Key)))
  1945. #else
  1946. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1947. #endif
  1948. {
  1949. this.session.InflightMessages.Remove(msgContext.Key);
  1950. }
  1951. #if TRACE
  1952. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1953. #endif
  1954. }
  1955. else
  1956. {
  1957. // re-enqueue message
  1958. this.inflightQueue.Enqueue(msgContext);
  1959. }
  1960. }
  1961. else
  1962. {
  1963. // re-enqueue message
  1964. this.inflightQueue.Enqueue(msgContext);
  1965. }
  1966. }
  1967. break;
  1968. case MqttMsgState.WaitForPubcomp:
  1969. // QoS 2, waiting for PUBCOMP of a PUBREL message sent
  1970. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1971. {
  1972. acknowledge = false;
  1973. lock (this.internalQueue)
  1974. {
  1975. if (this.internalQueue.Count > 0)
  1976. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1977. }
  1978. // it is a PUBCOMP message
  1979. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE))
  1980. {
  1981. // PUBCOMP message for the current message
  1982. if (msgReceived.MessageId == msgInflight.MessageId)
  1983. {
  1984. lock (this.internalQueue)
  1985. {
  1986. // received message processed
  1987. this.internalQueue.Dequeue();
  1988. acknowledge = true;
  1989. msgReceivedProcessed = true;
  1990. #if TRACE
  1991. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1992. #endif
  1993. }
  1994. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  1995. // notify received acknowledge from broker of a published message
  1996. this.OnInternalEvent(internalEvent);
  1997. // PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state
  1998. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1999. (this.session != null) &&
  2000. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2001. (this.session.InflightMessages.Contains(msgContext.Key)))
  2002. #else
  2003. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2004. #endif
  2005. {
  2006. this.session.InflightMessages.Remove(msgContext.Key);
  2007. }
  2008. #if TRACE
  2009. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  2010. #endif
  2011. }
  2012. }
  2013. // it is a PUBREC message
  2014. else if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  2015. {
  2016. // another PUBREC message for the current message due to a retransmitted PUBLISH
  2017. // I'm in waiting for PUBCOMP, so I can discard this PUBREC
  2018. if (msgReceived.MessageId == msgInflight.MessageId)
  2019. {
  2020. lock (this.internalQueue)
  2021. {
  2022. // received message processed
  2023. this.internalQueue.Dequeue();
  2024. acknowledge = true;
  2025. msgReceivedProcessed = true;
  2026. #if TRACE
  2027. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2028. #endif
  2029. // re-enqueue message
  2030. this.inflightQueue.Enqueue(msgContext);
  2031. }
  2032. }
  2033. }
  2034. // current message not acknowledged
  2035. if (!acknowledge)
  2036. {
  2037. delta = Environment.TickCount - msgContext.Timestamp;
  2038. // check timeout for receiving PUBCOMP since PUBREL was sent
  2039. if (delta >= this.settings.DelayOnRetry)
  2040. {
  2041. // max retry not reached, resend
  2042. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  2043. {
  2044. msgContext.State = MqttMsgState.SendPubrel;
  2045. // re-enqueue message
  2046. this.inflightQueue.Enqueue(msgContext);
  2047. // update timeout (0 -> reanalyze queue immediately)
  2048. timeout = 0;
  2049. }
  2050. else
  2051. {
  2052. // PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too
  2053. if ((this.session != null) &&
  2054. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2055. (this.session.InflightMessages.Contains(msgContext.Key)))
  2056. #else
  2057. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2058. #endif
  2059. {
  2060. this.session.InflightMessages.Remove(msgContext.Key);
  2061. }
  2062. // if PUBCOMP for a PUBLISH message not received after retries, raise event for not published
  2063. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  2064. // notify not received acknowledge from broker and message not published
  2065. this.OnInternalEvent(internalEvent);
  2066. }
  2067. }
  2068. else
  2069. {
  2070. // re-enqueue message
  2071. this.inflightQueue.Enqueue(msgContext);
  2072. // update timeout
  2073. int msgTimeout = (this.settings.DelayOnRetry - delta);
  2074. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  2075. }
  2076. }
  2077. }
  2078. break;
  2079. case MqttMsgState.SendPubrec:
  2080. // TODO : impossible ? --> QueuedQos2 ToAcknowledge
  2081. break;
  2082. case MqttMsgState.SendPubrel:
  2083. // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
  2084. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  2085. {
  2086. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  2087. pubrel.MessageId = msgInflight.MessageId;
  2088. msgContext.State = MqttMsgState.WaitForPubcomp;
  2089. msgContext.Timestamp = Environment.TickCount;
  2090. msgContext.Attempt++;
  2091. // retry ? set dup flag [v3.1.1] no needed
  2092. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1)
  2093. {
  2094. if (msgContext.Attempt > 1)
  2095. pubrel.DupFlag = true;
  2096. }
  2097. this.Send(pubrel);
  2098. // update timeout : minimum between delay (based on current message sent) or current timeout
  2099. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  2100. // re-enqueue message
  2101. this.inflightQueue.Enqueue(msgContext);
  2102. }
  2103. break;
  2104. case MqttMsgState.SendPubcomp:
  2105. // TODO : impossible ?
  2106. break;
  2107. case MqttMsgState.SendPuback:
  2108. // TODO : impossible ? --> QueuedQos1 ToAcknowledge
  2109. break;
  2110. default:
  2111. break;
  2112. }
  2113. }
  2114. // if calculated timeout is MaxValue, it means that must be Infinite (-1)
  2115. if (timeout == Int32.MaxValue)
  2116. timeout = Timeout.Infinite;
  2117. // if message received is orphan, no corresponding message in inflight queue
  2118. // based on messageId, we need to remove from the queue
  2119. if ((msgReceived != null) && !msgReceivedProcessed)
  2120. {
  2121. this.internalQueue.Dequeue();
  2122. #if TRACE
  2123. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0} orphan", msgReceived);
  2124. #endif
  2125. }
  2126. }
  2127. }
  2128. }
  2129. }
  2130. catch (MqttCommunicationException e)
  2131. {
  2132. // possible exception on Send, I need to re-enqueue not sent message
  2133. if (msgContext != null)
  2134. // re-enqueue message
  2135. this.inflightQueue.Enqueue(msgContext);
  2136. #if TRACE
  2137. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  2138. #endif
  2139. // raise disconnection client event
  2140. this.OnConnectionClosing();
  2141. }
  2142. }
  2143. /// <summary>
  2144. /// Restore session
  2145. /// </summary>
  2146. private void RestoreSession()
  2147. {
  2148. // if not clean session
  2149. if (!this.CleanSession)
  2150. {
  2151. // there is a previous session
  2152. if (this.session != null)
  2153. {
  2154. lock (this.inflightQueue)
  2155. {
  2156. foreach (MqttMsgContext msgContext in this.session.InflightMessages.Values)
  2157. {
  2158. this.inflightQueue.Enqueue(msgContext);
  2159. // if it is a PUBLISH message to publish
  2160. if ((msgContext.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2161. (msgContext.Flow == MqttMsgFlow.ToPublish))
  2162. {
  2163. // it's QoS 1 and we haven't received PUBACK
  2164. if ((msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) &&
  2165. (msgContext.State == MqttMsgState.WaitForPuback))
  2166. {
  2167. // we haven't received PUBACK, we need to resend PUBLISH message
  2168. msgContext.State = MqttMsgState.QueuedQos1;
  2169. }
  2170. // it's QoS 2
  2171. else if (msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)
  2172. {
  2173. // we haven't received PUBREC, we need to resend PUBLISH message
  2174. if (msgContext.State == MqttMsgState.WaitForPubrec)
  2175. {
  2176. msgContext.State = MqttMsgState.QueuedQos2;
  2177. }
  2178. // we haven't received PUBCOMP, we need to resend PUBREL for it
  2179. else if (msgContext.State == MqttMsgState.WaitForPubcomp)
  2180. {
  2181. msgContext.State = MqttMsgState.SendPubrel;
  2182. }
  2183. }
  2184. }
  2185. }
  2186. }
  2187. // unlock process inflight queue
  2188. this.inflightWaitHandle.Set();
  2189. }
  2190. else
  2191. {
  2192. // create new session
  2193. this.session = new MqttClientSession(this.ClientId);
  2194. }
  2195. }
  2196. // clean any previous session
  2197. else
  2198. {
  2199. if (this.session != null)
  2200. this.session.Clear();
  2201. }
  2202. }
  2203. #if BROKER
  2204. /// <summary>
  2205. /// Load a given session
  2206. /// </summary>
  2207. /// <param name="session">MQTT Client session to load</param>
  2208. public void LoadSession(MqttClientSession session)
  2209. {
  2210. // if not clean session
  2211. if (!this.CleanSession)
  2212. {
  2213. // set the session ...
  2214. this.session = session;
  2215. // ... and restore it
  2216. this.RestoreSession();
  2217. }
  2218. }
  2219. #endif
  2220. /// <summary>
  2221. /// Generate the next message identifier
  2222. /// </summary>
  2223. /// <returns>Message identifier</returns>
  2224. private ushort GetMessageId()
  2225. {
  2226. // if 0 or max UInt16, it becomes 1 (first valid messageId)
  2227. this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1;
  2228. return this.messageIdCounter;
  2229. }
  2230. /// <summary>
  2231. /// Finder class for PUBLISH message inside a queue
  2232. /// </summary>
  2233. internal class MqttMsgContextFinder
  2234. {
  2235. // PUBLISH message id
  2236. internal ushort MessageId { get; set; }
  2237. // message flow into inflight queue
  2238. internal MqttMsgFlow Flow { get; set; }
  2239. /// <summary>
  2240. /// Constructor
  2241. /// </summary>
  2242. /// <param name="messageId">Message Id</param>
  2243. /// <param name="flow">Message flow inside inflight queue</param>
  2244. internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow)
  2245. {
  2246. this.MessageId = messageId;
  2247. this.Flow = flow;
  2248. }
  2249. internal bool Find(object item)
  2250. {
  2251. MqttMsgContext msgCtx = (MqttMsgContext)item;
  2252. return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2253. (msgCtx.Message.MessageId == this.MessageId) &&
  2254. msgCtx.Flow == this.Flow);
  2255. }
  2256. }
  2257. }
  2258. /// <summary>
  2259. /// MQTT protocol version
  2260. /// </summary>
  2261. public enum MqttProtocolVersion
  2262. {
  2263. Version_3_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1,
  2264. Version_3_1_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1_1
  2265. }
  2266. }