MqttNetworkChannel.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. */
  13. using System;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Text;
  17. using System.Threading.Tasks;
  18. using Windows.Networking;
  19. using Windows.Networking.Sockets;
  20. using System.Runtime.InteropServices.WindowsRuntime;
  21. using Windows.Storage.Streams;
  22. using System.Threading;
  23. namespace uPLibrary.Networking.M2Mqtt
  24. {
  25. public class MqttNetworkChannel : IMqttNetworkChannel
  26. {
  27. // stream socket for communication
  28. private StreamSocket socket;
  29. // remote host information
  30. private HostName remoteHostName;
  31. private int remotePort;
  32. // using SSL
  33. private bool secure;
  34. /// <summary>
  35. /// Constructor
  36. /// </summary>
  37. /// <param name="socket">Socket opened with the client</param>
  38. public MqttNetworkChannel(StreamSocket socket)
  39. {
  40. this.socket = socket;
  41. }
  42. /// <summary>
  43. /// Constructor
  44. /// </summary>
  45. /// <param name="remoteHostName">Remote Host name</param>
  46. /// <param name="remotePort">Remote port</param>
  47. /// <param name="secure">Using SSL</param>
  48. public MqttNetworkChannel(string remoteHostName, int remotePort, bool secure)
  49. {
  50. this.remoteHostName = new HostName(remoteHostName);
  51. this.remotePort = remotePort;
  52. this.secure = secure;
  53. }
  54. public bool DataAvailable
  55. {
  56. get { return true; }
  57. }
  58. public int Receive(byte[] buffer)
  59. {
  60. IBuffer result;
  61. // read all data needed (until fill buffer)
  62. int idx = 0;
  63. while (idx < buffer.Length)
  64. {
  65. // fixed scenario with socket closed gracefully by peer/broker and
  66. // Read return 0. Avoid infinite loop.
  67. // read is executed synchronously
  68. result = this.socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None).AsTask().Result;
  69. if (result.Length == 0)
  70. return 0;
  71. idx += (int)result.Length;
  72. }
  73. return buffer.Length;
  74. }
  75. public int Receive(byte[] buffer, int timeout)
  76. {
  77. CancellationTokenSource cts = new CancellationTokenSource(timeout);
  78. try
  79. {
  80. IBuffer result;
  81. // read all data needed (until fill buffer)
  82. int idx = 0;
  83. while (idx < buffer.Length)
  84. {
  85. // fixed scenario with socket closed gracefully by peer/broker and
  86. // Read return 0. Avoid infinite loop.
  87. // read is executed synchronously
  88. result = this.socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None).AsTask(cts.Token).Result;
  89. if (result.Length == 0)
  90. return 0;
  91. idx += (int)result.Length;
  92. }
  93. return buffer.Length;
  94. }
  95. catch (TaskCanceledException)
  96. {
  97. return 0;
  98. }
  99. }
  100. public int Send(byte[] buffer)
  101. {
  102. // send is executed synchronously
  103. return (int)this.socket.OutputStream.WriteAsync(buffer.AsBuffer()).AsTask().Result;
  104. }
  105. public void Close()
  106. {
  107. this.socket.Dispose();
  108. }
  109. public void Connect()
  110. {
  111. this.socket = new StreamSocket();
  112. // connection is executed synchronously
  113. this.socket.ConnectAsync(this.remoteHostName,
  114. this.remotePort.ToString(),
  115. this.secure ? SocketProtectionLevel.Tls12 : SocketProtectionLevel.PlainSocket).AsTask().Wait();
  116. }
  117. }
  118. }