MqttClient.cs 121 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Net;
  15. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  16. using System.Net.Sockets;
  17. using System.Security.Cryptography.X509Certificates;
  18. #endif
  19. using System.Threading;
  20. using uPLibrary.Networking.M2Mqtt.Exceptions;
  21. using uPLibrary.Networking.M2Mqtt.Messages;
  22. using uPLibrary.Networking.M2Mqtt.Session;
  23. using uPLibrary.Networking.M2Mqtt.Utility;
  24. using uPLibrary.Networking.M2Mqtt.Internal;
  25. // if .Net Micro Framework
  26. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  27. using Microsoft.SPOT;
  28. #if SSL
  29. using Microsoft.SPOT.Net.Security;
  30. #endif
  31. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  32. #else
  33. using System.Collections.Generic;
  34. #if (SSL && !(WINDOWS_APP || WINDOWS_PHONE_APP))
  35. using System.Security.Authentication;
  36. using System.Net.Security;
  37. #endif
  38. #endif
  39. #if (WINDOWS_APP || WINDOWS_PHONE_APP)
  40. using Windows.Networking.Sockets;
  41. #endif
  42. using System.Collections;
  43. // alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
  44. // (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
  45. using MqttUtility = uPLibrary.Networking.M2Mqtt.Utility;
  46. using System.IO;
  47. namespace uPLibrary.Networking.M2Mqtt
  48. {
  49. /// <summary>
  50. /// MQTT Client
  51. /// </summary>
  52. public class MqttClient
  53. {
  54. #if BROKER
  55. #region Constants ...
  56. // thread names
  57. private const string RECEIVE_THREAD_NAME = "ReceiveThread";
  58. private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread";
  59. private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread";
  60. private const string KEEP_ALIVE_THREAD = "KeepAliveThread";
  61. #endregion
  62. #endif
  63. /// <summary>
  64. /// Delagate that defines event handler for PUBLISH message received
  65. /// </summary>
  66. public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e);
  67. /// <summary>
  68. /// Delegate that defines event handler for published message
  69. /// </summary>
  70. public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e);
  71. /// <summary>
  72. /// Delagate that defines event handler for subscribed topic
  73. /// </summary>
  74. public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e);
  75. /// <summary>
  76. /// Delagate that defines event handler for unsubscribed topic
  77. /// </summary>
  78. public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e);
  79. #if BROKER
  80. /// <summary>
  81. /// Delagate that defines event handler for SUBSCRIBE message received
  82. /// </summary>
  83. public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e);
  84. /// <summary>
  85. /// Delagate that defines event handler for UNSUBSCRIBE message received
  86. /// </summary>
  87. public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e);
  88. /// <summary>
  89. /// Delagate that defines event handler for CONNECT message received
  90. /// </summary>
  91. public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e);
  92. /// <summary>
  93. /// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
  94. /// </summary>
  95. public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e);
  96. #endif
  97. /// <summary>
  98. /// Delegate that defines event handler for cliet/peer disconnection
  99. /// </summary>
  100. public delegate void ConnectionClosedEventHandler(object sender, EventArgs e);
  101. // broker hostname (or ip address) and port
  102. private string brokerHostName;
  103. private int brokerPort;
  104. // running status of threads
  105. private bool isRunning;
  106. // event for raising received message event
  107. private AutoResetEvent receiveEventWaitHandle;
  108. // event for starting process inflight queue asynchronously
  109. private AutoResetEvent inflightWaitHandle;
  110. // event for signaling synchronous receive
  111. AutoResetEvent syncEndReceiving;
  112. // message received
  113. MqttMsgBase msgReceived;
  114. // exeption thrown during receiving
  115. Exception exReceiving;
  116. // keep alive period (in ms)
  117. private int keepAlivePeriod;
  118. // events for signaling on keep alive thread
  119. private AutoResetEvent keepAliveEvent;
  120. private AutoResetEvent keepAliveEventEnd;
  121. // last communication time in ticks
  122. private int lastCommTime;
  123. // event for PUBLISH message received
  124. public event MqttMsgPublishEventHandler MqttMsgPublishReceived;
  125. // event for published message
  126. public event MqttMsgPublishedEventHandler MqttMsgPublished;
  127. // event for subscribed topic
  128. public event MqttMsgSubscribedEventHandler MqttMsgSubscribed;
  129. // event for unsubscribed topic
  130. public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed;
  131. #if BROKER
  132. // event for SUBSCRIBE message received
  133. public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived;
  134. // event for USUBSCRIBE message received
  135. public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived;
  136. // event for CONNECT message received
  137. public event MqttMsgConnectEventHandler MqttMsgConnected;
  138. // event for DISCONNECT message received
  139. public event MqttMsgDisconnectEventHandler MqttMsgDisconnected;
  140. #endif
  141. // event for peer/client disconnection
  142. public event ConnectionClosedEventHandler ConnectionClosed;
  143. // channel to communicate over the network
  144. private IMqttNetworkChannel channel;
  145. // inflight messages queue
  146. private Queue inflightQueue;
  147. // internal queue for received messages about inflight messages
  148. private Queue internalQueue;
  149. // internal queue for dispatching events
  150. private Queue eventQueue;
  151. // session
  152. private MqttClientSession session;
  153. // reference to avoid access to singleton via property
  154. private MqttSettings settings;
  155. // current message identifier generated
  156. private ushort messageIdCounter = 0;
  157. // connection is closing due to peer
  158. private bool isConnectionClosing;
  159. /// <summary>
  160. /// Connection status between client and broker
  161. /// </summary>
  162. public bool IsConnected { get; private set; }
  163. /// <summary>
  164. /// Client identifier
  165. /// </summary>
  166. public string ClientId { get; private set; }
  167. /// <summary>
  168. /// Clean session flag
  169. /// </summary>
  170. public bool CleanSession { get; private set; }
  171. /// <summary>
  172. /// Will flag
  173. /// </summary>
  174. public bool WillFlag { get; private set; }
  175. /// <summary>
  176. /// Will QOS level
  177. /// </summary>
  178. public byte WillQosLevel { get; private set; }
  179. /// <summary>
  180. /// Will topic
  181. /// </summary>
  182. public string WillTopic { get; private set; }
  183. /// <summary>
  184. /// Will message
  185. /// </summary>
  186. public string WillMessage { get; private set; }
  187. /// <summary>
  188. /// MQTT protocol version
  189. /// </summary>
  190. public MqttProtocolVersion ProtocolVersion { get; set; }
  191. #if BROKER
  192. /// <summary>
  193. /// MQTT Client Session
  194. /// </summary>
  195. public MqttClientSession Session
  196. {
  197. get { return this.session; }
  198. set { this.session = value; }
  199. }
  200. #endif
  201. /// <summary>
  202. /// MQTT client settings
  203. /// </summary>
  204. public MqttSettings Settings
  205. {
  206. get { return this.settings; }
  207. }
  208. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  209. /// <summary>
  210. /// Constructor
  211. /// </summary>
  212. /// <param name="brokerIpAddress">Broker IP address</param>
  213. [Obsolete("Use this ctor MqttClient(string brokerHostName) instead")]
  214. public MqttClient(IPAddress brokerIpAddress) :
  215. this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  216. {
  217. }
  218. /// <summary>
  219. /// Constructor
  220. /// </summary>
  221. /// <param name="brokerIpAddress">Broker IP address</param>
  222. /// <param name="brokerPort">Broker port</param>
  223. /// <param name="secure">Using secure connection</param>
  224. /// <param name="caCert">CA certificate for secure connection</param>
  225. /// <param name="clientCert">Client certificate</param>
  226. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  227. [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) instead")]
  228. public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  229. {
  230. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  231. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  232. #else
  233. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol);
  234. #endif
  235. }
  236. #endif
  237. /// <summary>
  238. /// Constructor
  239. /// </summary>
  240. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  241. public MqttClient(string brokerHostName) :
  242. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  243. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  244. #else
  245. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, MqttSslProtocols.None)
  246. #endif
  247. {
  248. }
  249. /// <summary>
  250. /// Constructor
  251. /// </summary>
  252. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  253. /// <param name="brokerPort">Broker port</param>
  254. /// <param name="secure">Using secure connection</param>
  255. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  256. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  257. /// <param name="caCert">CA certificate for secure connection</param>
  258. /// <param name="clientCert">Client certificate</param>
  259. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  260. #else
  261. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  262. #endif
  263. {
  264. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  265. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  266. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  267. this.Init(brokerHostName, brokerPort, secure, sslProtocol);
  268. #else
  269. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol);
  270. #endif
  271. }
  272. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  273. /// <summary>
  274. /// Constructor
  275. /// </summary>
  276. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  277. /// <param name="brokerPort">Broker port</param>
  278. /// <param name="secure">Using secure connection</param>
  279. /// <param name="caCert">CA certificate for secure connection</param>
  280. /// <param name="clientCert">Client certificate</param>
  281. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  282. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  283. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  284. RemoteCertificateValidationCallback userCertificateValidationCallback)
  285. : this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, null)
  286. {
  287. }
  288. /// <summary>
  289. /// Constructor
  290. /// </summary>
  291. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  292. /// <param name="brokerPort">Broker port</param>
  293. /// <param name="secure">Using secure connection</param>
  294. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  295. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  296. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  297. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol,
  298. RemoteCertificateValidationCallback userCertificateValidationCallback,
  299. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  300. : this(brokerHostName, brokerPort, secure, null, null, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback)
  301. {
  302. }
  303. /// <summary>
  304. /// Constructor
  305. /// </summary>
  306. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  307. /// <param name="brokerPort">Broker port</param>
  308. /// <param name="secure">Using secure connection</param>
  309. /// <param name="caCert">CA certificate for secure connection</param>
  310. /// <param name="clientCert">Client certificate</param>
  311. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  312. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  313. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  314. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  315. RemoteCertificateValidationCallback userCertificateValidationCallback,
  316. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  317. {
  318. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  319. }
  320. #endif
  321. #if BROKER
  322. /// <summary>
  323. /// Constructor
  324. /// </summary>
  325. /// <param name="channel">Network channel for communication</param>
  326. public MqttClient(IMqttNetworkChannel channel)
  327. {
  328. // set default MQTT protocol version (default is 3.1.1)
  329. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  330. this.channel = channel;
  331. // reference to MQTT settings
  332. this.settings = MqttSettings.Instance;
  333. // client not connected yet (CONNACK not send from client), some default values
  334. this.IsConnected = false;
  335. this.ClientId = null;
  336. this.CleanSession = true;
  337. this.keepAliveEvent = new AutoResetEvent(false);
  338. // queue for handling inflight messages (publishing and acknowledge)
  339. this.inflightWaitHandle = new AutoResetEvent(false);
  340. this.inflightQueue = new Queue();
  341. // queue for received message
  342. this.receiveEventWaitHandle = new AutoResetEvent(false);
  343. this.eventQueue = new Queue();
  344. this.internalQueue = new Queue();
  345. // session
  346. this.session = null;
  347. }
  348. #endif
  349. /// <summary>
  350. /// MqttClient initialization
  351. /// </summary>
  352. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  353. /// <param name="brokerPort">Broker port</param>
  354. /// <param name="secure">>Using secure connection</param>
  355. /// <param name="caCert">CA certificate for secure connection</param>
  356. /// <param name="clientCert">Client certificate</param>
  357. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  358. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  359. /// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  360. /// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  361. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  362. RemoteCertificateValidationCallback userCertificateValidationCallback,
  363. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  364. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  365. private void Init(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  366. #else
  367. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  368. #endif
  369. {
  370. // set default MQTT protocol version (default is 3.1.1)
  371. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  372. #if !SSL
  373. // check security parameters
  374. if (secure)
  375. throw new ArgumentException("Library compiled without SSL support");
  376. #endif
  377. this.brokerHostName = brokerHostName;
  378. this.brokerPort = brokerPort;
  379. // reference to MQTT settings
  380. this.settings = MqttSettings.Instance;
  381. // set settings port based on secure connection or not
  382. if (!secure)
  383. this.settings.Port = this.brokerPort;
  384. else
  385. this.settings.SslPort = this.brokerPort;
  386. this.syncEndReceiving = new AutoResetEvent(false);
  387. this.keepAliveEvent = new AutoResetEvent(false);
  388. // queue for handling inflight messages (publishing and acknowledge)
  389. this.inflightWaitHandle = new AutoResetEvent(false);
  390. this.inflightQueue = new Queue();
  391. // queue for received message
  392. this.receiveEventWaitHandle = new AutoResetEvent(false);
  393. this.eventQueue = new Queue();
  394. this.internalQueue = new Queue();
  395. // session
  396. this.session = null;
  397. // create network channel
  398. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  399. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  400. #elif (WINDOWS_APP || WINDOWS_PHONE_APP)
  401. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, sslProtocol);
  402. #else
  403. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol);
  404. #endif
  405. }
  406. /// <summary>
  407. /// Connect to broker
  408. /// </summary>
  409. /// <param name="clientId">Client identifier</param>
  410. /// <returns>Return code of CONNACK message from broker</returns>
  411. public byte Connect(string clientId)
  412. {
  413. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  414. }
  415. /// <summary>
  416. /// Connect to broker
  417. /// </summary>
  418. /// <param name="clientId">Client identifier</param>
  419. /// <param name="cleanSession">Clean sessione flag</param>
  420. /// <returns>Return code of CONNACK message from broker</returns>
  421. public byte Connect(string clientId,
  422. bool cleanSession)
  423. {
  424. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  425. }
  426. /// <summary>
  427. /// Connect to broker
  428. /// </summary>
  429. /// <param name="clientId">Client identifier</param>
  430. /// <param name="username">Username</param>
  431. /// <param name="password">Password</param>
  432. /// <returns>Return code of CONNACK message from broker</returns>
  433. public byte Connect(string clientId,
  434. string username,
  435. string password)
  436. {
  437. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  438. }
  439. /// <summary>
  440. /// Connect to broker
  441. /// </summary>
  442. /// <param name="clientId">Client identifier</param>
  443. /// <param name="username">Username</param>
  444. /// <param name="password">Password</param>
  445. /// <param name="cleanSession">Clean sessione flag</param>
  446. /// <param name="keepAlivePeriod">Keep alive period</param>
  447. /// <returns>Return code of CONNACK message from broker</returns>
  448. public byte Connect(string clientId,
  449. string username,
  450. string password,
  451. bool cleanSession,
  452. ushort keepAlivePeriod)
  453. {
  454. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod);
  455. }
  456. /// <summary>
  457. /// Connect to broker
  458. /// </summary>
  459. /// <param name="clientId">Client identifier</param>
  460. /// <param name="username">Username</param>
  461. /// <param name="password">Password</param>
  462. /// <param name="willRetain">Will retain flag</param>
  463. /// <param name="willQosLevel">Will QOS level</param>
  464. /// <param name="willFlag">Will flag</param>
  465. /// <param name="willTopic">Will topic</param>
  466. /// <param name="willMessage">Will message</param>
  467. /// <param name="cleanSession">Clean sessione flag</param>
  468. /// <param name="keepAlivePeriod">Keep alive period</param>
  469. /// <returns>Return code of CONNACK message from broker</returns>
  470. public byte Connect(string clientId,
  471. string username,
  472. string password,
  473. bool willRetain,
  474. byte willQosLevel,
  475. bool willFlag,
  476. string willTopic,
  477. string willMessage,
  478. bool cleanSession,
  479. ushort keepAlivePeriod)
  480. {
  481. // create CONNECT message
  482. MqttMsgConnect connect = new MqttMsgConnect(clientId,
  483. username,
  484. password,
  485. willRetain,
  486. willQosLevel,
  487. willFlag,
  488. willTopic,
  489. willMessage,
  490. cleanSession,
  491. keepAlivePeriod,
  492. (byte)this.ProtocolVersion);
  493. try
  494. {
  495. // connect to the broker
  496. this.channel.Connect();
  497. }
  498. catch (Exception ex)
  499. {
  500. throw new MqttConnectionException("Exception connecting to the broker", ex);
  501. }
  502. this.lastCommTime = 0;
  503. this.isRunning = true;
  504. this.isConnectionClosing = false;
  505. // start thread for receiving messages from broker
  506. Fx.StartThread(this.ReceiveThread);
  507. MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
  508. // if connection accepted, start keep alive timer and
  509. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  510. {
  511. // set all client properties
  512. this.ClientId = clientId;
  513. this.CleanSession = cleanSession;
  514. this.WillFlag = willFlag;
  515. this.WillTopic = willTopic;
  516. this.WillMessage = willMessage;
  517. this.WillQosLevel = willQosLevel;
  518. this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms
  519. // restore previous session
  520. this.RestoreSession();
  521. // keep alive period equals zero means turning off keep alive mechanism
  522. if (this.keepAlivePeriod != 0)
  523. {
  524. // start thread for sending keep alive message to the broker
  525. Fx.StartThread(this.KeepAliveThread);
  526. }
  527. // start thread for raising received message event from broker
  528. Fx.StartThread(this.DispatchEventThread);
  529. // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
  530. Fx.StartThread(this.ProcessInflightThread);
  531. this.IsConnected = true;
  532. }
  533. return connack.ReturnCode;
  534. }
  535. /// <summary>
  536. /// Disconnect from broker
  537. /// </summary>
  538. public void Disconnect()
  539. {
  540. MqttMsgDisconnect disconnect = new MqttMsgDisconnect();
  541. this.Send(disconnect);
  542. // close client
  543. this.OnConnectionClosing();
  544. }
  545. #if BROKER
  546. /// <summary>
  547. /// Open client communication
  548. /// </summary>
  549. public void Open()
  550. {
  551. this.isRunning = true;
  552. // start thread for receiving messages from client
  553. Fx.StartThread(this.ReceiveThread);
  554. // start thread for raising received message event from client
  555. Fx.StartThread(this.DispatchEventThread);
  556. // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
  557. Fx.StartThread(this.ProcessInflightThread);
  558. }
  559. #endif
  560. /// <summary>
  561. /// Close client
  562. /// </summary>
  563. #if BROKER
  564. public void Close()
  565. #else
  566. private void Close()
  567. #endif
  568. {
  569. // stop receiving thread
  570. this.isRunning = false;
  571. // wait end receive event thread
  572. if (this.receiveEventWaitHandle != null)
  573. this.receiveEventWaitHandle.Set();
  574. // wait end process inflight thread
  575. if (this.inflightWaitHandle != null)
  576. this.inflightWaitHandle.Set();
  577. #if BROKER
  578. // unlock keep alive thread
  579. this.keepAliveEvent.Set();
  580. #else
  581. // unlock keep alive thread and wait
  582. this.keepAliveEvent.Set();
  583. if (this.keepAliveEventEnd != null)
  584. this.keepAliveEventEnd.WaitOne();
  585. #endif
  586. // clear all queues
  587. this.inflightQueue.Clear();
  588. this.internalQueue.Clear();
  589. this.eventQueue.Clear();
  590. // close network channel
  591. this.channel.Close();
  592. this.IsConnected = false;
  593. }
  594. /// <summary>
  595. /// Execute ping to broker for keep alive
  596. /// </summary>
  597. /// <returns>PINGRESP message from broker</returns>
  598. private MqttMsgPingResp Ping()
  599. {
  600. MqttMsgPingReq pingreq = new MqttMsgPingReq();
  601. try
  602. {
  603. // broker must send PINGRESP within timeout equal to keep alive period
  604. return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
  605. }
  606. catch (Exception e)
  607. {
  608. #if TRACE
  609. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  610. #endif
  611. // client must close connection
  612. this.OnConnectionClosing();
  613. return null;
  614. }
  615. }
  616. #if BROKER
  617. /// <summary>
  618. /// Send CONNACK message to the client (connection accepted or not)
  619. /// </summary>
  620. /// <param name="connect">CONNECT message with all client information</param>
  621. /// <param name="returnCode">Return code for CONNACK message</param>
  622. /// <param name="clientId">If not null, client id assigned by broker</param>
  623. /// <param name="sessionPresent">Session present on the broker</param>
  624. public void Connack(MqttMsgConnect connect, byte returnCode, string clientId, bool sessionPresent)
  625. {
  626. this.lastCommTime = 0;
  627. // create CONNACK message and ...
  628. MqttMsgConnack connack = new MqttMsgConnack();
  629. connack.ReturnCode = returnCode;
  630. // [v3.1.1] session present flag
  631. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1_1)
  632. connack.SessionPresent = sessionPresent;
  633. // ... send it to the client
  634. this.Send(connack);
  635. // connection accepted, start keep alive thread checking
  636. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  637. {
  638. // [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length
  639. // and broker assigned a unique identifier to the client
  640. this.ClientId = (clientId == null) ? connect.ClientId : clientId;
  641. this.CleanSession = connect.CleanSession;
  642. this.WillFlag = connect.WillFlag;
  643. this.WillTopic = connect.WillTopic;
  644. this.WillMessage = connect.WillMessage;
  645. this.WillQosLevel = connect.WillQosLevel;
  646. this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms
  647. // broker has a tolerance of 1.5 specified keep alive period
  648. this.keepAlivePeriod += (this.keepAlivePeriod / 2);
  649. // start thread for checking keep alive period timeout
  650. Fx.StartThread(this.KeepAliveThread);
  651. this.isConnectionClosing = false;
  652. this.IsConnected = true;
  653. }
  654. // connection refused, close TCP/IP channel
  655. else
  656. {
  657. this.Close();
  658. }
  659. }
  660. /// <summary>
  661. /// Send SUBACK message to the client
  662. /// </summary>
  663. /// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
  664. /// <param name="grantedQosLevels">Granted QoS Levels</param>
  665. public void Suback(ushort messageId, byte[] grantedQosLevels)
  666. {
  667. MqttMsgSuback suback = new MqttMsgSuback();
  668. suback.MessageId = messageId;
  669. suback.GrantedQoSLevels = grantedQosLevels;
  670. this.Send(suback);
  671. }
  672. /// <summary>
  673. /// Send UNSUBACK message to the client
  674. /// </summary>
  675. /// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
  676. public void Unsuback(ushort messageId)
  677. {
  678. MqttMsgUnsuback unsuback = new MqttMsgUnsuback();
  679. unsuback.MessageId = messageId;
  680. this.Send(unsuback);
  681. }
  682. #endif
  683. /// <summary>
  684. /// Subscribe for message topics
  685. /// </summary>
  686. /// <param name="topics">List of topics to subscribe</param>
  687. /// <param name="qosLevels">QOS levels related to topics</param>
  688. /// <returns>Message Id related to SUBSCRIBE message</returns>
  689. public ushort Subscribe(string[] topics, byte[] qosLevels)
  690. {
  691. MqttMsgSubscribe subscribe =
  692. new MqttMsgSubscribe(topics, qosLevels);
  693. subscribe.MessageId = this.GetMessageId();
  694. // enqueue subscribe request into the inflight queue
  695. this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish);
  696. return subscribe.MessageId;
  697. }
  698. /// <summary>
  699. /// Unsubscribe for message topics
  700. /// </summary>
  701. /// <param name="topics">List of topics to unsubscribe</param>
  702. /// <returns>Message Id in UNSUBACK message from broker</returns>
  703. public ushort Unsubscribe(string[] topics)
  704. {
  705. MqttMsgUnsubscribe unsubscribe =
  706. new MqttMsgUnsubscribe(topics);
  707. unsubscribe.MessageId = this.GetMessageId();
  708. // enqueue unsubscribe request into the inflight queue
  709. this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish);
  710. return unsubscribe.MessageId;
  711. }
  712. /// <summary>
  713. /// Publish a message asynchronously (QoS Level 0 and not retained)
  714. /// </summary>
  715. /// <param name="topic">Message topic</param>
  716. /// <param name="message">Message data (payload)</param>
  717. /// <returns>Message Id related to PUBLISH message</returns>
  718. public ushort Publish(string topic, byte[] message)
  719. {
  720. return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
  721. }
  722. /// <summary>
  723. /// Publish a message asynchronously
  724. /// </summary>
  725. /// <param name="topic">Message topic</param>
  726. /// <param name="message">Message data (payload)</param>
  727. /// <param name="qosLevel">QoS Level</param>
  728. /// <param name="retain">Retain flag</param>
  729. /// <returns>Message Id related to PUBLISH message</returns>
  730. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  731. {
  732. MqttMsgPublish publish =
  733. new MqttMsgPublish(topic, message, false, qosLevel, retain);
  734. publish.MessageId = this.GetMessageId();
  735. // enqueue message to publish into the inflight queue
  736. bool enqueue = this.EnqueueInflight(publish, MqttMsgFlow.ToPublish);
  737. // message enqueued
  738. if (enqueue)
  739. return publish.MessageId;
  740. // infligh queue full, message not enqueued
  741. else
  742. throw new MqttClientException(MqttClientErrorCode.InflightQueueFull);
  743. }
  744. /// <summary>
  745. /// Wrapper method for raising events
  746. /// </summary>
  747. /// <param name="internalEvent">Internal event</param>
  748. private void OnInternalEvent(InternalEvent internalEvent)
  749. {
  750. lock (this.eventQueue)
  751. {
  752. this.eventQueue.Enqueue(internalEvent);
  753. }
  754. this.receiveEventWaitHandle.Set();
  755. }
  756. /// <summary>
  757. /// Wrapper method for raising closing connection event
  758. /// </summary>
  759. private void OnConnectionClosing()
  760. {
  761. if (!this.isConnectionClosing)
  762. {
  763. this.isConnectionClosing = true;
  764. this.receiveEventWaitHandle.Set();
  765. }
  766. }
  767. /// <summary>
  768. /// Wrapper method for raising PUBLISH message received event
  769. /// </summary>
  770. /// <param name="publish">PUBLISH message received</param>
  771. private void OnMqttMsgPublishReceived(MqttMsgPublish publish)
  772. {
  773. if (this.MqttMsgPublishReceived != null)
  774. {
  775. this.MqttMsgPublishReceived(this,
  776. new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain));
  777. }
  778. }
  779. /// <summary>
  780. /// Wrapper method for raising published message event
  781. /// </summary>
  782. /// <param name="messageId">Message identifier for published message</param>
  783. /// <param name="isPublished">Publish flag</param>
  784. private void OnMqttMsgPublished(ushort messageId, bool isPublished)
  785. {
  786. if (this.MqttMsgPublished != null)
  787. {
  788. this.MqttMsgPublished(this,
  789. new MqttMsgPublishedEventArgs(messageId, isPublished));
  790. }
  791. }
  792. /// <summary>
  793. /// Wrapper method for raising subscribed topic event
  794. /// </summary>
  795. /// <param name="suback">SUBACK message received</param>
  796. private void OnMqttMsgSubscribed(MqttMsgSuback suback)
  797. {
  798. if (this.MqttMsgSubscribed != null)
  799. {
  800. this.MqttMsgSubscribed(this,
  801. new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels));
  802. }
  803. }
  804. /// <summary>
  805. /// Wrapper method for raising unsubscribed topic event
  806. /// </summary>
  807. /// <param name="messageId">Message identifier for unsubscribed topic</param>
  808. private void OnMqttMsgUnsubscribed(ushort messageId)
  809. {
  810. if (this.MqttMsgUnsubscribed != null)
  811. {
  812. this.MqttMsgUnsubscribed(this,
  813. new MqttMsgUnsubscribedEventArgs(messageId));
  814. }
  815. }
  816. #if BROKER
  817. /// <summary>
  818. /// Wrapper method for raising SUBSCRIBE message event
  819. /// </summary>
  820. /// <param name="messageId">Message identifier for subscribe topics request</param>
  821. /// <param name="topics">Topics requested to subscribe</param>
  822. /// <param name="qosLevels">List of QOS Levels requested</param>
  823. private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels)
  824. {
  825. if (this.MqttMsgSubscribeReceived != null)
  826. {
  827. this.MqttMsgSubscribeReceived(this,
  828. new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels));
  829. }
  830. }
  831. /// <summary>
  832. /// Wrapper method for raising UNSUBSCRIBE message event
  833. /// </summary>
  834. /// <param name="messageId">Message identifier for unsubscribe topics request</param>
  835. /// <param name="topics">Topics requested to unsubscribe</param>
  836. private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics)
  837. {
  838. if (this.MqttMsgUnsubscribeReceived != null)
  839. {
  840. this.MqttMsgUnsubscribeReceived(this,
  841. new MqttMsgUnsubscribeEventArgs(messageId, topics));
  842. }
  843. }
  844. /// <summary>
  845. /// Wrapper method for raising CONNECT message event
  846. /// </summary>
  847. private void OnMqttMsgConnected(MqttMsgConnect connect)
  848. {
  849. if (this.MqttMsgConnected != null)
  850. {
  851. this.ProtocolVersion = (MqttProtocolVersion)connect.ProtocolVersion;
  852. this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect));
  853. }
  854. }
  855. /// <summary>
  856. /// Wrapper method for raising DISCONNECT message event
  857. /// </summary>
  858. private void OnMqttMsgDisconnected()
  859. {
  860. if (this.MqttMsgDisconnected != null)
  861. {
  862. this.MqttMsgDisconnected(this, EventArgs.Empty);
  863. }
  864. }
  865. #endif
  866. /// <summary>
  867. /// Wrapper method for peer/client disconnection
  868. /// </summary>
  869. private void OnConnectionClosed()
  870. {
  871. if (this.ConnectionClosed != null)
  872. {
  873. this.ConnectionClosed(this, EventArgs.Empty);
  874. }
  875. }
  876. /// <summary>
  877. /// Send a message
  878. /// </summary>
  879. /// <param name="msgBytes">Message bytes</param>
  880. private void Send(byte[] msgBytes)
  881. {
  882. try
  883. {
  884. // send message
  885. this.channel.Send(msgBytes);
  886. #if !BROKER
  887. // update last message sent ticks
  888. this.lastCommTime = Environment.TickCount;
  889. #endif
  890. }
  891. catch (Exception e)
  892. {
  893. #if TRACE
  894. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  895. #endif
  896. throw new MqttCommunicationException(e);
  897. }
  898. }
  899. /// <summary>
  900. /// Send a message
  901. /// </summary>
  902. /// <param name="msg">Message</param>
  903. private void Send(MqttMsgBase msg)
  904. {
  905. #if TRACE
  906. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  907. #endif
  908. this.Send(msg.GetBytes((byte)this.ProtocolVersion));
  909. }
  910. /// <summary>
  911. /// Send a message to the broker and wait answer
  912. /// </summary>
  913. /// <param name="msgBytes">Message bytes</param>
  914. /// <returns>MQTT message response</returns>
  915. private MqttMsgBase SendReceive(byte[] msgBytes)
  916. {
  917. return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  918. }
  919. /// <summary>
  920. /// Send a message to the broker and wait answer
  921. /// </summary>
  922. /// <param name="msgBytes">Message bytes</param>
  923. /// <param name="timeout">Timeout for receiving answer</param>
  924. /// <returns>MQTT message response</returns>
  925. private MqttMsgBase SendReceive(byte[] msgBytes, int timeout)
  926. {
  927. // reset handle before sending
  928. this.syncEndReceiving.Reset();
  929. try
  930. {
  931. // send message
  932. this.channel.Send(msgBytes);
  933. // update last message sent ticks
  934. this.lastCommTime = Environment.TickCount;
  935. }
  936. catch (Exception e)
  937. {
  938. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP)
  939. if (typeof(SocketException) == e.GetType())
  940. {
  941. // connection reset by broker
  942. if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset)
  943. this.IsConnected = false;
  944. }
  945. #endif
  946. #if TRACE
  947. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  948. #endif
  949. throw new MqttCommunicationException(e);
  950. }
  951. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  952. // wait for answer from broker
  953. if (this.syncEndReceiving.WaitOne(timeout, false))
  954. #else
  955. // wait for answer from broker
  956. if (this.syncEndReceiving.WaitOne(timeout))
  957. #endif
  958. {
  959. // message received without exception
  960. if (this.exReceiving == null)
  961. return this.msgReceived;
  962. // receiving thread catched exception
  963. else
  964. throw this.exReceiving;
  965. }
  966. else
  967. {
  968. // throw timeout exception
  969. throw new MqttCommunicationException();
  970. }
  971. }
  972. /// <summary>
  973. /// Send a message to the broker and wait answer
  974. /// </summary>
  975. /// <param name="msg">Message</param>
  976. /// <returns>MQTT message response</returns>
  977. private MqttMsgBase SendReceive(MqttMsgBase msg)
  978. {
  979. return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  980. }
  981. /// <summary>
  982. /// Send a message to the broker and wait answer
  983. /// </summary>
  984. /// <param name="msg">Message</param>
  985. /// <param name="timeout">Timeout for receiving answer</param>
  986. /// <returns>MQTT message response</returns>
  987. private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout)
  988. {
  989. #if TRACE
  990. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  991. #endif
  992. return this.SendReceive(msg.GetBytes((byte)this.ProtocolVersion), timeout);
  993. }
  994. /// <summary>
  995. /// Enqueue a message into the inflight queue
  996. /// </summary>
  997. /// <param name="msg">Message to enqueue</param>
  998. /// <param name="flow">Message flow (publish, acknowledge)</param>
  999. /// <returns>Message enqueued or not</returns>
  1000. private bool EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow)
  1001. {
  1002. // enqueue is needed (or not)
  1003. bool enqueue = true;
  1004. // if it is a PUBLISH message with QoS Level 2
  1005. if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1006. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1007. {
  1008. lock (this.inflightQueue)
  1009. {
  1010. // if it is a PUBLISH message already received (it is in the inflight queue), the publisher
  1011. // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
  1012. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1013. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1014. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1015. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1016. // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
  1017. // to change state to re-send PUBREC
  1018. if (msgCtx != null)
  1019. {
  1020. msgCtx.State = MqttMsgState.QueuedQos2;
  1021. msgCtx.Flow = MqttMsgFlow.ToAcknowledge;
  1022. enqueue = false;
  1023. }
  1024. }
  1025. }
  1026. if (enqueue)
  1027. {
  1028. // set a default state
  1029. MqttMsgState state = MqttMsgState.QueuedQos0;
  1030. // based on QoS level, the messages flow between broker and client changes
  1031. switch (msg.QosLevel)
  1032. {
  1033. // QoS Level 0
  1034. case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE:
  1035. state = MqttMsgState.QueuedQos0;
  1036. break;
  1037. // QoS Level 1
  1038. case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE:
  1039. state = MqttMsgState.QueuedQos1;
  1040. break;
  1041. // QoS Level 2
  1042. case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE:
  1043. state = MqttMsgState.QueuedQos2;
  1044. break;
  1045. }
  1046. // [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1
  1047. // so QueuedQos1 state isn't valid for them
  1048. if (msg.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1049. state = MqttMsgState.SendSubscribe;
  1050. else if (msg.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1051. state = MqttMsgState.SendUnsubscribe;
  1052. // queue message context
  1053. MqttMsgContext msgContext = new MqttMsgContext()
  1054. {
  1055. Message = msg,
  1056. State = state,
  1057. Flow = flow,
  1058. Attempt = 0
  1059. };
  1060. lock (this.inflightQueue)
  1061. {
  1062. // check number of messages inside inflight queue
  1063. enqueue = (this.inflightQueue.Count < this.settings.InflightQueueSize);
  1064. if (enqueue)
  1065. {
  1066. // enqueue message and unlock send thread
  1067. this.inflightQueue.Enqueue(msgContext);
  1068. #if TRACE
  1069. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1070. #endif
  1071. // PUBLISH message
  1072. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1073. {
  1074. // to publish and QoS level 1 or 2
  1075. if ((msgContext.Flow == MqttMsgFlow.ToPublish) &&
  1076. ((msg.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) ||
  1077. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)))
  1078. {
  1079. if (this.session != null)
  1080. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1081. }
  1082. // to acknowledge and QoS level 2
  1083. else if ((msgContext.Flow == MqttMsgFlow.ToAcknowledge) &&
  1084. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1085. {
  1086. if (this.session != null)
  1087. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1088. }
  1089. }
  1090. }
  1091. }
  1092. }
  1093. this.inflightWaitHandle.Set();
  1094. return enqueue;
  1095. }
  1096. /// <summary>
  1097. /// Enqueue a message into the internal queue
  1098. /// </summary>
  1099. /// <param name="msg">Message to enqueue</param>
  1100. private void EnqueueInternal(MqttMsgBase msg)
  1101. {
  1102. // enqueue is needed (or not)
  1103. bool enqueue = true;
  1104. // if it is a PUBREL message (for QoS Level 2)
  1105. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)
  1106. {
  1107. lock (this.inflightQueue)
  1108. {
  1109. // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
  1110. // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
  1111. // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
  1112. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1113. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1114. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1115. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1116. // the PUBLISH message isn't in the inflight queue, it was already processed so
  1117. // we need to re-send PUBCOMP only
  1118. if (msgCtx == null)
  1119. {
  1120. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1121. pubcomp.MessageId = msg.MessageId;
  1122. this.Send(pubcomp);
  1123. enqueue = false;
  1124. }
  1125. }
  1126. }
  1127. // if it is a PUBCOMP message (for QoS Level 2)
  1128. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE)
  1129. {
  1130. lock (this.inflightQueue)
  1131. {
  1132. // if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue,
  1133. // it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP
  1134. // but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP.
  1135. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1136. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1137. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1138. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1139. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP
  1140. if (msgCtx == null)
  1141. {
  1142. enqueue = false;
  1143. }
  1144. }
  1145. }
  1146. // if it is a PUBREC message (for QoS Level 2)
  1147. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)
  1148. {
  1149. lock (this.inflightQueue)
  1150. {
  1151. // if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue,
  1152. // it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time
  1153. // the publish is failed and we need only to ignore this PUBREC.
  1154. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1155. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1156. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1157. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1158. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC
  1159. if (msgCtx == null)
  1160. {
  1161. enqueue = false;
  1162. }
  1163. }
  1164. }
  1165. if (enqueue)
  1166. {
  1167. lock (this.internalQueue)
  1168. {
  1169. this.internalQueue.Enqueue(msg);
  1170. #if TRACE
  1171. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1172. #endif
  1173. this.inflightWaitHandle.Set();
  1174. }
  1175. }
  1176. }
  1177. /// <summary>
  1178. /// Thread for receiving messages
  1179. /// </summary>
  1180. private void ReceiveThread()
  1181. {
  1182. int readBytes = 0;
  1183. byte[] fixedHeaderFirstByte = new byte[1];
  1184. byte msgType;
  1185. while (this.isRunning)
  1186. {
  1187. try
  1188. {
  1189. // read first byte (fixed header)
  1190. readBytes = this.channel.Receive(fixedHeaderFirstByte);
  1191. if (readBytes > 0)
  1192. {
  1193. #if BROKER
  1194. // update last message received ticks
  1195. this.lastCommTime = Environment.TickCount;
  1196. #endif
  1197. // extract message type from received byte
  1198. msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET);
  1199. switch (msgType)
  1200. {
  1201. // CONNECT message received
  1202. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1203. #if BROKER
  1204. MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1205. #if TRACE
  1206. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", connect);
  1207. #endif
  1208. // raise message received event
  1209. this.OnInternalEvent(new MsgInternalEvent(connect));
  1210. break;
  1211. #else
  1212. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1213. #endif
  1214. // CONNACK message received
  1215. case MqttMsgBase.MQTT_MSG_CONNACK_TYPE:
  1216. #if BROKER
  1217. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1218. #else
  1219. this.msgReceived = MqttMsgConnack.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1220. #if TRACE
  1221. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1222. #endif
  1223. this.syncEndReceiving.Set();
  1224. break;
  1225. #endif
  1226. // PINGREQ message received
  1227. case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE:
  1228. #if BROKER
  1229. this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1230. #if TRACE
  1231. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1232. #endif
  1233. MqttMsgPingResp pingresp = new MqttMsgPingResp();
  1234. this.Send(pingresp);
  1235. break;
  1236. #else
  1237. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1238. #endif
  1239. // PINGRESP message received
  1240. case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE:
  1241. #if BROKER
  1242. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1243. #else
  1244. this.msgReceived = MqttMsgPingResp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1245. #if TRACE
  1246. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1247. #endif
  1248. this.syncEndReceiving.Set();
  1249. break;
  1250. #endif
  1251. // SUBSCRIBE message received
  1252. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1253. #if BROKER
  1254. MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1255. #if TRACE
  1256. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", subscribe);
  1257. #endif
  1258. // raise message received event
  1259. this.OnInternalEvent(new MsgInternalEvent(subscribe));
  1260. break;
  1261. #else
  1262. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1263. #endif
  1264. // SUBACK message received
  1265. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1266. #if BROKER
  1267. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1268. #else
  1269. // enqueue SUBACK message received (for QoS Level 1) into the internal queue
  1270. MqttMsgSuback suback = MqttMsgSuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1271. #if TRACE
  1272. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", suback);
  1273. #endif
  1274. // enqueue SUBACK message into the internal queue
  1275. this.EnqueueInternal(suback);
  1276. break;
  1277. #endif
  1278. // PUBLISH message received
  1279. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1280. MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1281. #if TRACE
  1282. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", publish);
  1283. #endif
  1284. // enqueue PUBLISH message to acknowledge into the inflight queue
  1285. this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge);
  1286. break;
  1287. // PUBACK message received
  1288. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1289. // enqueue PUBACK message received (for QoS Level 1) into the internal queue
  1290. MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1291. #if TRACE
  1292. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", puback);
  1293. #endif
  1294. // enqueue PUBACK message into the internal queue
  1295. this.EnqueueInternal(puback);
  1296. break;
  1297. // PUBREC message received
  1298. case MqttMsgBase.MQTT_MSG_PUBREC_TYPE:
  1299. // enqueue PUBREC message received (for QoS Level 2) into the internal queue
  1300. MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1301. #if TRACE
  1302. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrec);
  1303. #endif
  1304. // enqueue PUBREC message into the internal queue
  1305. this.EnqueueInternal(pubrec);
  1306. break;
  1307. // PUBREL message received
  1308. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1309. // enqueue PUBREL message received (for QoS Level 2) into the internal queue
  1310. MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1311. #if TRACE
  1312. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrel);
  1313. #endif
  1314. // enqueue PUBREL message into the internal queue
  1315. this.EnqueueInternal(pubrel);
  1316. break;
  1317. // PUBCOMP message received
  1318. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1319. // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
  1320. MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1321. #if TRACE
  1322. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubcomp);
  1323. #endif
  1324. // enqueue PUBCOMP message into the internal queue
  1325. this.EnqueueInternal(pubcomp);
  1326. break;
  1327. // UNSUBSCRIBE message received
  1328. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1329. #if BROKER
  1330. MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1331. #if TRACE
  1332. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsubscribe);
  1333. #endif
  1334. // raise message received event
  1335. this.OnInternalEvent(new MsgInternalEvent(unsubscribe));
  1336. break;
  1337. #else
  1338. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1339. #endif
  1340. // UNSUBACK message received
  1341. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1342. #if BROKER
  1343. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1344. #else
  1345. // enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
  1346. MqttMsgUnsuback unsuback = MqttMsgUnsuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1347. #if TRACE
  1348. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsuback);
  1349. #endif
  1350. // enqueue UNSUBACK message into the internal queue
  1351. this.EnqueueInternal(unsuback);
  1352. break;
  1353. #endif
  1354. // DISCONNECT message received
  1355. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1356. #if BROKER
  1357. MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1358. #if TRACE
  1359. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", disconnect);
  1360. #endif
  1361. // raise message received event
  1362. this.OnInternalEvent(new MsgInternalEvent(disconnect));
  1363. break;
  1364. #else
  1365. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1366. #endif
  1367. default:
  1368. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1369. }
  1370. this.exReceiving = null;
  1371. }
  1372. // zero bytes read, peer gracefully closed socket
  1373. else
  1374. {
  1375. // wake up thread that will notify connection is closing
  1376. this.OnConnectionClosing();
  1377. }
  1378. }
  1379. catch (Exception e)
  1380. {
  1381. #if TRACE
  1382. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1383. #endif
  1384. this.exReceiving = new MqttCommunicationException(e);
  1385. bool close = false;
  1386. if (e.GetType() == typeof(MqttClientException))
  1387. {
  1388. // [v3.1.1] scenarios the receiver MUST close the network connection
  1389. MqttClientException ex = e as MqttClientException;
  1390. close = ((ex.ErrorCode == MqttClientErrorCode.InvalidFlagBits) ||
  1391. (ex.ErrorCode == MqttClientErrorCode.InvalidProtocolName) ||
  1392. (ex.ErrorCode == MqttClientErrorCode.InvalidConnectFlags));
  1393. }
  1394. #if !(WINDOWS_APP || WINDOWS_PHONE_APP)
  1395. else if ((e.GetType() == typeof(IOException)) || (e.GetType() == typeof(SocketException)) ||
  1396. ((e.InnerException != null) && (e.InnerException.GetType() == typeof(SocketException)))) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException
  1397. {
  1398. close = true;
  1399. }
  1400. #endif
  1401. if (close)
  1402. {
  1403. // wake up thread that will notify connection is closing
  1404. this.OnConnectionClosing();
  1405. }
  1406. }
  1407. }
  1408. }
  1409. /// <summary>
  1410. /// Thread for handling keep alive message
  1411. /// </summary>
  1412. private void KeepAliveThread()
  1413. {
  1414. int delta = 0;
  1415. int wait = this.keepAlivePeriod;
  1416. // create event to signal that current thread is end
  1417. this.keepAliveEventEnd = new AutoResetEvent(false);
  1418. while (this.isRunning)
  1419. {
  1420. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1421. // waiting...
  1422. this.keepAliveEvent.WaitOne(wait, false);
  1423. #else
  1424. // waiting...
  1425. this.keepAliveEvent.WaitOne(wait);
  1426. #endif
  1427. if (this.isRunning)
  1428. {
  1429. delta = Environment.TickCount - this.lastCommTime;
  1430. // if timeout exceeded ...
  1431. if (delta >= this.keepAlivePeriod)
  1432. {
  1433. #if BROKER
  1434. // client must close connection
  1435. this.OnConnectionClosing();
  1436. #else
  1437. // ... send keep alive
  1438. this.Ping();
  1439. wait = this.keepAlivePeriod;
  1440. #endif
  1441. }
  1442. else
  1443. {
  1444. // update waiting time
  1445. wait = this.keepAlivePeriod - delta;
  1446. }
  1447. }
  1448. }
  1449. // signal thread end
  1450. this.keepAliveEventEnd.Set();
  1451. }
  1452. /// <summary>
  1453. /// Thread for raising event
  1454. /// </summary>
  1455. private void DispatchEventThread()
  1456. {
  1457. while (this.isRunning)
  1458. {
  1459. #if BROKER
  1460. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1461. {
  1462. // broker need to receive the first message (CONNECT)
  1463. // within a reasonable amount of time after TCP/IP connection
  1464. if (!this.IsConnected)
  1465. {
  1466. // wait on receiving message from client with a connection timeout
  1467. if (!this.receiveEventWaitHandle.WaitOne(this.settings.TimeoutOnConnection))
  1468. {
  1469. // client must close connection
  1470. this.Close();
  1471. // client raw disconnection
  1472. this.OnConnectionClosed();
  1473. }
  1474. }
  1475. else
  1476. {
  1477. // wait on receiving message from client
  1478. this.receiveEventWaitHandle.WaitOne();
  1479. }
  1480. }
  1481. #else
  1482. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1483. // wait on receiving message from client
  1484. this.receiveEventWaitHandle.WaitOne();
  1485. #endif
  1486. // check if it is running or we are closing client
  1487. if (this.isRunning)
  1488. {
  1489. // get event from queue
  1490. InternalEvent internalEvent = null;
  1491. lock (this.eventQueue)
  1492. {
  1493. if (this.eventQueue.Count > 0)
  1494. internalEvent = (InternalEvent)this.eventQueue.Dequeue();
  1495. }
  1496. // it's an event with a message inside
  1497. if (internalEvent != null)
  1498. {
  1499. MqttMsgBase msg = ((MsgInternalEvent)internalEvent).Message;
  1500. if (msg != null)
  1501. {
  1502. switch (msg.Type)
  1503. {
  1504. // CONNECT message received
  1505. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1506. #if BROKER
  1507. // raise connected client event (CONNECT message received)
  1508. this.OnMqttMsgConnected((MqttMsgConnect)msg);
  1509. break;
  1510. #else
  1511. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1512. #endif
  1513. // SUBSCRIBE message received
  1514. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1515. #if BROKER
  1516. MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg;
  1517. // raise subscribe topic event (SUBSCRIBE message received)
  1518. this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels);
  1519. break;
  1520. #else
  1521. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1522. #endif
  1523. // SUBACK message received
  1524. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1525. // raise subscribed topic event (SUBACK message received)
  1526. this.OnMqttMsgSubscribed((MqttMsgSuback)msg);
  1527. break;
  1528. // PUBLISH message received
  1529. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1530. // PUBLISH message received in a published internal event, no publish succeeded
  1531. if (internalEvent.GetType() == typeof(MsgPublishedInternalEvent))
  1532. this.OnMqttMsgPublished(msg.MessageId, false);
  1533. else
  1534. // raise PUBLISH message received event
  1535. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1536. break;
  1537. // PUBACK message received
  1538. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1539. // raise published message event
  1540. // (PUBACK received for QoS Level 1)
  1541. this.OnMqttMsgPublished(msg.MessageId, true);
  1542. break;
  1543. // PUBREL message received
  1544. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1545. // raise message received event
  1546. // (PUBREL received for QoS Level 2)
  1547. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1548. break;
  1549. // PUBCOMP message received
  1550. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1551. // raise published message event
  1552. // (PUBCOMP received for QoS Level 2)
  1553. this.OnMqttMsgPublished(msg.MessageId, true);
  1554. break;
  1555. // UNSUBSCRIBE message received from client
  1556. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1557. #if BROKER
  1558. MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg;
  1559. // raise unsubscribe topic event (UNSUBSCRIBE message received)
  1560. this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics);
  1561. break;
  1562. #else
  1563. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1564. #endif
  1565. // UNSUBACK message received
  1566. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1567. // raise unsubscribed topic event
  1568. this.OnMqttMsgUnsubscribed(msg.MessageId);
  1569. break;
  1570. // DISCONNECT message received from client
  1571. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1572. #if BROKER
  1573. // raise disconnected client event (DISCONNECT message received)
  1574. this.OnMqttMsgDisconnected();
  1575. break;
  1576. #else
  1577. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1578. #endif
  1579. }
  1580. }
  1581. }
  1582. // all events for received messages dispatched, check if there is closing connection
  1583. if ((this.eventQueue.Count == 0) && this.isConnectionClosing)
  1584. {
  1585. // client must close connection
  1586. this.Close();
  1587. // client raw disconnection
  1588. this.OnConnectionClosed();
  1589. }
  1590. }
  1591. }
  1592. }
  1593. /// <summary>
  1594. /// Process inflight messages queue
  1595. /// </summary>
  1596. private void ProcessInflightThread()
  1597. {
  1598. MqttMsgContext msgContext = null;
  1599. MqttMsgBase msgInflight = null;
  1600. MqttMsgBase msgReceived = null;
  1601. InternalEvent internalEvent = null;
  1602. bool acknowledge = false;
  1603. int timeout = Timeout.Infinite;
  1604. int delta;
  1605. bool msgReceivedProcessed = false;
  1606. try
  1607. {
  1608. while (this.isRunning)
  1609. {
  1610. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1611. // wait on message queueud to inflight
  1612. this.inflightWaitHandle.WaitOne(timeout, false);
  1613. #else
  1614. // wait on message queueud to inflight
  1615. this.inflightWaitHandle.WaitOne(timeout);
  1616. #endif
  1617. // it could be unblocked because Close() method is joining
  1618. if (this.isRunning)
  1619. {
  1620. lock (this.inflightQueue)
  1621. {
  1622. // message received and peeked from internal queue is processed
  1623. // NOTE : it has the corresponding message in inflight queue based on messageId
  1624. // (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...)
  1625. // if it's orphan we need to remove from internal queue
  1626. msgReceivedProcessed = false;
  1627. acknowledge = false;
  1628. msgReceived = null;
  1629. // set timeout tu MaxValue instead of Infinte (-1) to perform
  1630. // compare with calcultad current msgTimeout
  1631. timeout = Int32.MaxValue;
  1632. // a message inflight could be re-enqueued but we have to
  1633. // analyze it only just one time for cycle
  1634. int count = this.inflightQueue.Count;
  1635. // process all inflight queued messages
  1636. while (count > 0)
  1637. {
  1638. count--;
  1639. acknowledge = false;
  1640. msgReceived = null;
  1641. // check to be sure that client isn't closing and all queues are now empty !
  1642. if (!this.isRunning)
  1643. break;
  1644. // dequeue message context from queue
  1645. msgContext = (MqttMsgContext)this.inflightQueue.Dequeue();
  1646. // get inflight message
  1647. msgInflight = (MqttMsgBase)msgContext.Message;
  1648. switch (msgContext.State)
  1649. {
  1650. case MqttMsgState.QueuedQos0:
  1651. // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
  1652. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1653. {
  1654. this.Send(msgInflight);
  1655. }
  1656. // QoS 0, no need acknowledge
  1657. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1658. {
  1659. internalEvent = new MsgInternalEvent(msgInflight);
  1660. // notify published message from broker (no need acknowledged)
  1661. this.OnInternalEvent(internalEvent);
  1662. }
  1663. #if TRACE
  1664. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1665. #endif
  1666. break;
  1667. case MqttMsgState.QueuedQos1:
  1668. // [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1
  1669. case MqttMsgState.SendSubscribe:
  1670. case MqttMsgState.SendUnsubscribe:
  1671. // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
  1672. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1673. {
  1674. msgContext.Timestamp = Environment.TickCount;
  1675. msgContext.Attempt++;
  1676. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1677. {
  1678. // PUBLISH message to send, wait for PUBACK
  1679. msgContext.State = MqttMsgState.WaitForPuback;
  1680. // retry ? set dup flag [v3.1.1] only for PUBLISH message
  1681. if (msgContext.Attempt > 1)
  1682. msgInflight.DupFlag = true;
  1683. }
  1684. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1685. // SUBSCRIBE message to send, wait for SUBACK
  1686. msgContext.State = MqttMsgState.WaitForSuback;
  1687. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1688. // UNSUBSCRIBE message to send, wait for UNSUBACK
  1689. msgContext.State = MqttMsgState.WaitForUnsuback;
  1690. this.Send(msgInflight);
  1691. // update timeout : minimum between delay (based on current message sent) or current timeout
  1692. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1693. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1694. this.inflightQueue.Enqueue(msgContext);
  1695. }
  1696. // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
  1697. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1698. {
  1699. MqttMsgPuback puback = new MqttMsgPuback();
  1700. puback.MessageId = msgInflight.MessageId;
  1701. this.Send(puback);
  1702. internalEvent = new MsgInternalEvent(msgInflight);
  1703. // notify published message from broker and acknowledged
  1704. this.OnInternalEvent(internalEvent);
  1705. #if TRACE
  1706. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1707. #endif
  1708. }
  1709. break;
  1710. case MqttMsgState.QueuedQos2:
  1711. // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
  1712. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1713. {
  1714. msgContext.Timestamp = Environment.TickCount;
  1715. msgContext.Attempt++;
  1716. msgContext.State = MqttMsgState.WaitForPubrec;
  1717. // retry ? set dup flag
  1718. if (msgContext.Attempt > 1)
  1719. msgInflight.DupFlag = true;
  1720. this.Send(msgInflight);
  1721. // update timeout : minimum between delay (based on current message sent) or current timeout
  1722. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1723. // re-enqueue message (I have to re-analyze for receiving PUBREC)
  1724. this.inflightQueue.Enqueue(msgContext);
  1725. }
  1726. // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
  1727. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1728. {
  1729. MqttMsgPubrec pubrec = new MqttMsgPubrec();
  1730. pubrec.MessageId = msgInflight.MessageId;
  1731. msgContext.State = MqttMsgState.WaitForPubrel;
  1732. this.Send(pubrec);
  1733. // re-enqueue message (I have to re-analyze for receiving PUBREL)
  1734. this.inflightQueue.Enqueue(msgContext);
  1735. }
  1736. break;
  1737. case MqttMsgState.WaitForPuback:
  1738. case MqttMsgState.WaitForSuback:
  1739. case MqttMsgState.WaitForUnsuback:
  1740. // QoS 1, waiting for PUBACK of a PUBLISH message sent or
  1741. // waiting for SUBACK of a SUBSCRIBE message sent or
  1742. // waiting for UNSUBACK of a UNSUBSCRIBE message sent or
  1743. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1744. {
  1745. acknowledge = false;
  1746. lock (this.internalQueue)
  1747. {
  1748. if (this.internalQueue.Count > 0)
  1749. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1750. }
  1751. // it is a PUBACK message or a SUBACK/UNSUBACK message
  1752. if (msgReceived != null)
  1753. {
  1754. // PUBACK message or SUBACK/UNSUBACK message for the current message
  1755. if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1756. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1757. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)))
  1758. {
  1759. lock (this.internalQueue)
  1760. {
  1761. // received message processed
  1762. this.internalQueue.Dequeue();
  1763. acknowledge = true;
  1764. msgReceivedProcessed = true;
  1765. #if TRACE
  1766. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1767. #endif
  1768. }
  1769. // if PUBACK received, confirm published with flag
  1770. if (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE)
  1771. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  1772. else
  1773. internalEvent = new MsgInternalEvent(msgReceived);
  1774. // notify received acknowledge from broker of a published message or subscribe/unsubscribe message
  1775. this.OnInternalEvent(internalEvent);
  1776. // PUBACK received for PUBLISH message with QoS Level 1, remove from session state
  1777. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1778. (this.session != null) &&
  1779. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1780. (this.session.InflightMessages.Contains(msgContext.Key)))
  1781. #else
  1782. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1783. #endif
  1784. {
  1785. this.session.InflightMessages.Remove(msgContext.Key);
  1786. }
  1787. #if TRACE
  1788. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1789. #endif
  1790. }
  1791. }
  1792. // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
  1793. if (!acknowledge)
  1794. {
  1795. delta = Environment.TickCount - msgContext.Timestamp;
  1796. // check timeout for receiving PUBACK since PUBLISH was sent or
  1797. // for receiving SUBACK since SUBSCRIBE was sent or
  1798. // for receiving UNSUBACK since UNSUBSCRIBE was sent
  1799. if (delta >= this.settings.DelayOnRetry)
  1800. {
  1801. // max retry not reached, resend
  1802. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1803. {
  1804. msgContext.State = MqttMsgState.QueuedQos1;
  1805. // re-enqueue message
  1806. this.inflightQueue.Enqueue(msgContext);
  1807. // update timeout (0 -> reanalyze queue immediately)
  1808. timeout = 0;
  1809. }
  1810. else
  1811. {
  1812. // if PUBACK for a PUBLISH message not received after retries, raise event for not published
  1813. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1814. {
  1815. // PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1816. if ((this.session != null) &&
  1817. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1818. (this.session.InflightMessages.Contains(msgContext.Key)))
  1819. #else
  1820. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1821. #endif
  1822. {
  1823. this.session.InflightMessages.Remove(msgContext.Key);
  1824. }
  1825. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1826. // notify not received acknowledge from broker and message not published
  1827. this.OnInternalEvent(internalEvent);
  1828. }
  1829. // NOTE : not raise events for SUBACK or UNSUBACK not received
  1830. // for the user no event raised means subscribe/unsubscribe failed
  1831. }
  1832. }
  1833. else
  1834. {
  1835. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1836. this.inflightQueue.Enqueue(msgContext);
  1837. // update timeout
  1838. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1839. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1840. }
  1841. }
  1842. }
  1843. break;
  1844. case MqttMsgState.WaitForPubrec:
  1845. // QoS 2, waiting for PUBREC of a PUBLISH message sent
  1846. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1847. {
  1848. acknowledge = false;
  1849. lock (this.internalQueue)
  1850. {
  1851. if (this.internalQueue.Count > 0)
  1852. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1853. }
  1854. // it is a PUBREC message
  1855. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  1856. {
  1857. // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
  1858. if (msgReceived.MessageId == msgInflight.MessageId)
  1859. {
  1860. lock (this.internalQueue)
  1861. {
  1862. // received message processed
  1863. this.internalQueue.Dequeue();
  1864. acknowledge = true;
  1865. msgReceivedProcessed = true;
  1866. #if TRACE
  1867. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1868. #endif
  1869. }
  1870. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1871. pubrel.MessageId = msgInflight.MessageId;
  1872. msgContext.State = MqttMsgState.WaitForPubcomp;
  1873. msgContext.Timestamp = Environment.TickCount;
  1874. msgContext.Attempt = 1;
  1875. this.Send(pubrel);
  1876. // update timeout : minimum between delay (based on current message sent) or current timeout
  1877. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1878. // re-enqueue message
  1879. this.inflightQueue.Enqueue(msgContext);
  1880. }
  1881. }
  1882. // current message not acknowledged
  1883. if (!acknowledge)
  1884. {
  1885. delta = Environment.TickCount - msgContext.Timestamp;
  1886. // check timeout for receiving PUBREC since PUBLISH was sent
  1887. if (delta >= this.settings.DelayOnRetry)
  1888. {
  1889. // max retry not reached, resend
  1890. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1891. {
  1892. msgContext.State = MqttMsgState.QueuedQos2;
  1893. // re-enqueue message
  1894. this.inflightQueue.Enqueue(msgContext);
  1895. // update timeout (0 -> reanalyze queue immediately)
  1896. timeout = 0;
  1897. }
  1898. else
  1899. {
  1900. // PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1901. if ((this.session != null) &&
  1902. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1903. (this.session.InflightMessages.Contains(msgContext.Key)))
  1904. #else
  1905. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1906. #endif
  1907. {
  1908. this.session.InflightMessages.Remove(msgContext.Key);
  1909. }
  1910. // if PUBREC for a PUBLISH message not received after retries, raise event for not published
  1911. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1912. // notify not received acknowledge from broker and message not published
  1913. this.OnInternalEvent(internalEvent);
  1914. }
  1915. }
  1916. else
  1917. {
  1918. // re-enqueue message
  1919. this.inflightQueue.Enqueue(msgContext);
  1920. // update timeout
  1921. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1922. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1923. }
  1924. }
  1925. }
  1926. break;
  1927. case MqttMsgState.WaitForPubrel:
  1928. // QoS 2, waiting for PUBREL of a PUBREC message sent
  1929. if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1930. {
  1931. lock (this.internalQueue)
  1932. {
  1933. if (this.internalQueue.Count > 0)
  1934. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1935. }
  1936. // it is a PUBREL message
  1937. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE))
  1938. {
  1939. // PUBREL message for the current message, send PUBCOMP
  1940. if (msgReceived.MessageId == msgInflight.MessageId)
  1941. {
  1942. lock (this.internalQueue)
  1943. {
  1944. // received message processed
  1945. this.internalQueue.Dequeue();
  1946. msgReceivedProcessed = true;
  1947. #if TRACE
  1948. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1949. #endif
  1950. }
  1951. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1952. pubcomp.MessageId = msgInflight.MessageId;
  1953. this.Send(pubcomp);
  1954. internalEvent = new MsgInternalEvent(msgInflight);
  1955. // notify published message from broker and acknowledged
  1956. this.OnInternalEvent(internalEvent);
  1957. // PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state
  1958. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1959. (this.session != null) &&
  1960. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1961. (this.session.InflightMessages.Contains(msgContext.Key)))
  1962. #else
  1963. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1964. #endif
  1965. {
  1966. this.session.InflightMessages.Remove(msgContext.Key);
  1967. }
  1968. #if TRACE
  1969. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1970. #endif
  1971. }
  1972. else
  1973. {
  1974. // re-enqueue message
  1975. this.inflightQueue.Enqueue(msgContext);
  1976. }
  1977. }
  1978. else
  1979. {
  1980. // re-enqueue message
  1981. this.inflightQueue.Enqueue(msgContext);
  1982. }
  1983. }
  1984. break;
  1985. case MqttMsgState.WaitForPubcomp:
  1986. // QoS 2, waiting for PUBCOMP of a PUBREL message sent
  1987. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1988. {
  1989. acknowledge = false;
  1990. lock (this.internalQueue)
  1991. {
  1992. if (this.internalQueue.Count > 0)
  1993. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1994. }
  1995. // it is a PUBCOMP message
  1996. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE))
  1997. {
  1998. // PUBCOMP message for the current message
  1999. if (msgReceived.MessageId == msgInflight.MessageId)
  2000. {
  2001. lock (this.internalQueue)
  2002. {
  2003. // received message processed
  2004. this.internalQueue.Dequeue();
  2005. acknowledge = true;
  2006. msgReceivedProcessed = true;
  2007. #if TRACE
  2008. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2009. #endif
  2010. }
  2011. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  2012. // notify received acknowledge from broker of a published message
  2013. this.OnInternalEvent(internalEvent);
  2014. // PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state
  2015. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2016. (this.session != null) &&
  2017. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2018. (this.session.InflightMessages.Contains(msgContext.Key)))
  2019. #else
  2020. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2021. #endif
  2022. {
  2023. this.session.InflightMessages.Remove(msgContext.Key);
  2024. }
  2025. #if TRACE
  2026. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  2027. #endif
  2028. }
  2029. }
  2030. // it is a PUBREC message
  2031. else if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  2032. {
  2033. // another PUBREC message for the current message due to a retransmitted PUBLISH
  2034. // I'm in waiting for PUBCOMP, so I can discard this PUBREC
  2035. if (msgReceived.MessageId == msgInflight.MessageId)
  2036. {
  2037. lock (this.internalQueue)
  2038. {
  2039. // received message processed
  2040. this.internalQueue.Dequeue();
  2041. acknowledge = true;
  2042. msgReceivedProcessed = true;
  2043. #if TRACE
  2044. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2045. #endif
  2046. // re-enqueue message
  2047. this.inflightQueue.Enqueue(msgContext);
  2048. }
  2049. }
  2050. }
  2051. // current message not acknowledged
  2052. if (!acknowledge)
  2053. {
  2054. delta = Environment.TickCount - msgContext.Timestamp;
  2055. // check timeout for receiving PUBCOMP since PUBREL was sent
  2056. if (delta >= this.settings.DelayOnRetry)
  2057. {
  2058. // max retry not reached, resend
  2059. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  2060. {
  2061. msgContext.State = MqttMsgState.SendPubrel;
  2062. // re-enqueue message
  2063. this.inflightQueue.Enqueue(msgContext);
  2064. // update timeout (0 -> reanalyze queue immediately)
  2065. timeout = 0;
  2066. }
  2067. else
  2068. {
  2069. // PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too
  2070. if ((this.session != null) &&
  2071. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2072. (this.session.InflightMessages.Contains(msgContext.Key)))
  2073. #else
  2074. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2075. #endif
  2076. {
  2077. this.session.InflightMessages.Remove(msgContext.Key);
  2078. }
  2079. // if PUBCOMP for a PUBLISH message not received after retries, raise event for not published
  2080. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  2081. // notify not received acknowledge from broker and message not published
  2082. this.OnInternalEvent(internalEvent);
  2083. }
  2084. }
  2085. else
  2086. {
  2087. // re-enqueue message
  2088. this.inflightQueue.Enqueue(msgContext);
  2089. // update timeout
  2090. int msgTimeout = (this.settings.DelayOnRetry - delta);
  2091. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  2092. }
  2093. }
  2094. }
  2095. break;
  2096. case MqttMsgState.SendPubrec:
  2097. // TODO : impossible ? --> QueuedQos2 ToAcknowledge
  2098. break;
  2099. case MqttMsgState.SendPubrel:
  2100. // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
  2101. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  2102. {
  2103. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  2104. pubrel.MessageId = msgInflight.MessageId;
  2105. msgContext.State = MqttMsgState.WaitForPubcomp;
  2106. msgContext.Timestamp = Environment.TickCount;
  2107. msgContext.Attempt++;
  2108. // retry ? set dup flag [v3.1.1] no needed
  2109. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1)
  2110. {
  2111. if (msgContext.Attempt > 1)
  2112. pubrel.DupFlag = true;
  2113. }
  2114. this.Send(pubrel);
  2115. // update timeout : minimum between delay (based on current message sent) or current timeout
  2116. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  2117. // re-enqueue message
  2118. this.inflightQueue.Enqueue(msgContext);
  2119. }
  2120. break;
  2121. case MqttMsgState.SendPubcomp:
  2122. // TODO : impossible ?
  2123. break;
  2124. case MqttMsgState.SendPuback:
  2125. // TODO : impossible ? --> QueuedQos1 ToAcknowledge
  2126. break;
  2127. default:
  2128. break;
  2129. }
  2130. }
  2131. // if calculated timeout is MaxValue, it means that must be Infinite (-1)
  2132. if (timeout == Int32.MaxValue)
  2133. timeout = Timeout.Infinite;
  2134. // if message received is orphan, no corresponding message in inflight queue
  2135. // based on messageId, we need to remove from the queue
  2136. if ((msgReceived != null) && !msgReceivedProcessed)
  2137. {
  2138. this.internalQueue.Dequeue();
  2139. #if TRACE
  2140. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0} orphan", msgReceived);
  2141. #endif
  2142. }
  2143. }
  2144. }
  2145. }
  2146. }
  2147. catch (MqttCommunicationException e)
  2148. {
  2149. // possible exception on Send, I need to re-enqueue not sent message
  2150. if (msgContext != null)
  2151. // re-enqueue message
  2152. this.inflightQueue.Enqueue(msgContext);
  2153. #if TRACE
  2154. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  2155. #endif
  2156. // raise disconnection client event
  2157. this.OnConnectionClosing();
  2158. }
  2159. }
  2160. /// <summary>
  2161. /// Restore session
  2162. /// </summary>
  2163. private void RestoreSession()
  2164. {
  2165. // if not clean session
  2166. if (!this.CleanSession)
  2167. {
  2168. // there is a previous session
  2169. if (this.session != null)
  2170. {
  2171. lock (this.inflightQueue)
  2172. {
  2173. foreach (MqttMsgContext msgContext in this.session.InflightMessages.Values)
  2174. {
  2175. this.inflightQueue.Enqueue(msgContext);
  2176. // if it is a PUBLISH message to publish
  2177. if ((msgContext.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2178. (msgContext.Flow == MqttMsgFlow.ToPublish))
  2179. {
  2180. // it's QoS 1 and we haven't received PUBACK
  2181. if ((msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) &&
  2182. (msgContext.State == MqttMsgState.WaitForPuback))
  2183. {
  2184. // we haven't received PUBACK, we need to resend PUBLISH message
  2185. msgContext.State = MqttMsgState.QueuedQos1;
  2186. }
  2187. // it's QoS 2
  2188. else if (msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)
  2189. {
  2190. // we haven't received PUBREC, we need to resend PUBLISH message
  2191. if (msgContext.State == MqttMsgState.WaitForPubrec)
  2192. {
  2193. msgContext.State = MqttMsgState.QueuedQos2;
  2194. }
  2195. // we haven't received PUBCOMP, we need to resend PUBREL for it
  2196. else if (msgContext.State == MqttMsgState.WaitForPubcomp)
  2197. {
  2198. msgContext.State = MqttMsgState.SendPubrel;
  2199. }
  2200. }
  2201. }
  2202. }
  2203. }
  2204. // unlock process inflight queue
  2205. this.inflightWaitHandle.Set();
  2206. }
  2207. else
  2208. {
  2209. // create new session
  2210. this.session = new MqttClientSession(this.ClientId);
  2211. }
  2212. }
  2213. // clean any previous session
  2214. else
  2215. {
  2216. if (this.session != null)
  2217. this.session.Clear();
  2218. }
  2219. }
  2220. #if BROKER
  2221. /// <summary>
  2222. /// Load a given session
  2223. /// </summary>
  2224. /// <param name="session">MQTT Client session to load</param>
  2225. public void LoadSession(MqttClientSession session)
  2226. {
  2227. // if not clean session
  2228. if (!this.CleanSession)
  2229. {
  2230. // set the session ...
  2231. this.session = session;
  2232. // ... and restore it
  2233. this.RestoreSession();
  2234. }
  2235. }
  2236. #endif
  2237. /// <summary>
  2238. /// Generate the next message identifier
  2239. /// </summary>
  2240. /// <returns>Message identifier</returns>
  2241. private ushort GetMessageId()
  2242. {
  2243. // if 0 or max UInt16, it becomes 1 (first valid messageId)
  2244. this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1;
  2245. return this.messageIdCounter;
  2246. }
  2247. /// <summary>
  2248. /// Finder class for PUBLISH message inside a queue
  2249. /// </summary>
  2250. internal class MqttMsgContextFinder
  2251. {
  2252. // PUBLISH message id
  2253. internal ushort MessageId { get; set; }
  2254. // message flow into inflight queue
  2255. internal MqttMsgFlow Flow { get; set; }
  2256. /// <summary>
  2257. /// Constructor
  2258. /// </summary>
  2259. /// <param name="messageId">Message Id</param>
  2260. /// <param name="flow">Message flow inside inflight queue</param>
  2261. internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow)
  2262. {
  2263. this.MessageId = messageId;
  2264. this.Flow = flow;
  2265. }
  2266. internal bool Find(object item)
  2267. {
  2268. MqttMsgContext msgCtx = (MqttMsgContext)item;
  2269. return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2270. (msgCtx.Message.MessageId == this.MessageId) &&
  2271. msgCtx.Flow == this.Flow);
  2272. }
  2273. }
  2274. }
  2275. /// <summary>
  2276. /// MQTT protocol version
  2277. /// </summary>
  2278. public enum MqttProtocolVersion
  2279. {
  2280. Version_3_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1,
  2281. Version_3_1_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1_1
  2282. }
  2283. }