diff --git a/src/devices/Arduino/ArduinoBoard.cs b/src/devices/Arduino/ArduinoBoard.cs index 782242c351ecba9756727f97287d0fcf702454dc..0bcf8a9169f3562cc619ee5524f77ab22224f437 100644 --- a/src/devices/Arduino/ArduinoBoard.cs +++ b/src/devices/Arduino/ArduinoBoard.cs @@ -63,12 +63,27 @@ namespace Iot.Device.Arduino /// The device is initialized when the first command is sent. The constructor always succeeds. /// /// A stream to an Arduino/Firmata device - public ArduinoBoard(Stream serialPortStream) + /// True to indicate that the stream supports hardware flow control (can be a serial port + /// with RTS/CTS handshake or a network stream where the protocol already supports flow control) + public ArduinoBoard(Stream serialPortStream, bool usesHardwareFlowControl) { _dataStream = serialPortStream ?? throw new ArgumentNullException(nameof(serialPortStream)); + StreamUsesHardwareFlowControl = usesHardwareFlowControl; _logger = this.GetCurrentClassLogger(); } + /// + /// Creates an instance of an Ardino board connection using the given stream (typically from a serial port) + /// + /// + /// The device is initialized when the first command is sent. The constructor always succeeds. + /// + /// A stream to an Arduino/Firmata device + public ArduinoBoard(Stream serialPortStream) + : this(serialPortStream, false) + { + } + /// /// Creates an instance of the Arduino board connection connected to a serial port /// @@ -80,6 +95,7 @@ namespace Iot.Device.Arduino { _dataStream = null; _serialPort = new SerialPort(portName, baudRate); + StreamUsesHardwareFlowControl = false; // Would need to configure the serial port externally for this to work _logger = this.GetCurrentClassLogger(); } @@ -88,6 +104,16 @@ namespace Iot.Device.Arduino /// protected ILogger Logger => _logger; + /// + /// Set this to true if the underlying stream uses some kind of hardware or low-level flow control (RTS/CTS for + /// a serial port, or a TCP socket). Setting this to true may improve performance on bulk transfers (such as + /// large SPI blocks) but can result in buffer overflows if flow control is not working. Default: false + /// + public bool StreamUsesHardwareFlowControl + { + get; + } + /// /// The list of supported pin modes. /// This list can be extended by adding special modes using . @@ -162,14 +188,47 @@ namespace Iot.Device.Arduino #endif out ArduinoBoard? board) { - board = null; + return TryConnectToNetworkedBoard(boardAddress, port, true, out board); + } + + /// + /// Tries to connect to an arduino over network. + /// This requires an arduino with an ethernet shield or an ESP32 with enabled WIFI support. + /// + /// The IP address of the board + /// The network port to use. The default port is 27016 + /// True to use an auto-reconnecting stream. Helpful when using an unreliable connection. + /// Returns the board if successful + /// True on success, false otherwise + public static bool TryConnectToNetworkedBoard(IPAddress boardAddress, int port, bool useAutoReconnect, +#if NET5_0_OR_GREATER + [NotNullWhen(true)] +#endif + out ArduinoBoard? board) + { try { - var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - socket.Connect(boardAddress, port); - socket.NoDelay = true; - var networkStream = new NetworkStream(socket, true); - board = new ArduinoBoard(networkStream); + Stream networkStream; + if (useAutoReconnect) + { + networkStream = new ReconnectingNetworkStream(() => + { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Connect(boardAddress, port); + socket.NoDelay = true; + Stream socketStream = new NetworkStream(socket, true); + return socketStream; + }); + } + else + { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Connect(boardAddress, port); + socket.NoDelay = true; + networkStream = new NetworkStream(socket, true); + } + + board = new ArduinoBoard(networkStream, true); if (!(board.FirmataVersion > new Version(1, 0))) { // Actually not expecting to get here (but the above will throw a SocketException if the remote end is not there) @@ -180,6 +239,7 @@ namespace Iot.Device.Arduino } catch (SocketException) { + board = null; return false; } } @@ -508,7 +568,21 @@ namespace Iot.Device.Arduino } else { - Logger.LogInformation(message); + // If the message contains a line feed, strip that + Logger.LogInformation(message.TrimEnd(new char[] { '\r', '\n' })); + } + + _commandHandlersLock.EnterReadLock(); + try + { + foreach (var handler in _extendedCommandHandlers) + { + handler.OnErrorMessage(message, exception); + } + } + finally + { + _commandHandlersLock.ExitReadLock(); } } @@ -561,7 +635,12 @@ namespace Iot.Device.Arduino { Initialize(); - return new GpioController(PinNumberingScheme.Logical, new ArduinoGpioControllerDriver(this, _supportedPinConfigurations)); + if (_firmata == null) + { + throw new ObjectDisposedException(nameof(_firmata)); + } + + return new GpioController(PinNumberingScheme.Logical, new ArduinoGpioControllerDriver(_firmata, _supportedPinConfigurations)); } /// @@ -691,6 +770,16 @@ namespace Iot.Device.Arduino Firmata.SetAnalogInputSamplingInterval(timeSpan); } + /// + /// Performs a software reset of the Arduino firmware + /// + public void SoftwareReset() + { + Initialize(); + Firmata.SendSoftwareReset(); + Firmata.QueryCapabilities(); + } + /// /// Standard dispose pattern /// @@ -708,6 +797,12 @@ namespace Iot.Device.Arduino } } + if (_firmata != null) + { + // Can end the next possible moment (otherwise might just throw a bunch of warnings before actually terminating anyway) + _firmata.InputThreadShouldExit = true; + } + _isDisposed = true; // Do this first, to force any blocking read operations to end if (_dataStream != null) @@ -754,5 +849,43 @@ namespace Iot.Device.Arduino Firmata.DisableSpi(); } } + + /// + /// Pings the device, to get an estimate about the round-trip time. + /// With some Wifi setups, the round trip time may be significantly higher than desired. + /// + /// The number of pings to send + /// The list of reply times. Contains a negative value for lost packets + public List Ping(int number) + { + Initialize(); + if (_firmata == null) + { + throw new ObjectDisposedException("Not connected"); + } + + List ret = new(); + Stopwatch sw = new Stopwatch(); + for (int i = 0; i < number; i++) + { + sw.Restart(); + try + { + _firmata.QueryFirmwareVersion(out _); + var elapsed = sw.Elapsed; + ret.Add(elapsed); + _logger.LogInformation($"Round trip time: {elapsed.TotalMilliseconds}ms"); + } + catch (TimeoutException x) + { + _logger.LogError(x, $"Timeout: {x.Message}"); + ret.Add(TimeSpan.FromMinutes(-1)); + } + + sw.Stop(); + } + + return ret; + } } } diff --git a/src/devices/Arduino/ArduinoGpioControllerDriver.cs b/src/devices/Arduino/ArduinoGpioControllerDriver.cs index 815b7854e8cd41618645ec72ce30c28106572091..3706ff9540286aabb34d9f6685da329720ab8644 100644 --- a/src/devices/Arduino/ArduinoGpioControllerDriver.cs +++ b/src/devices/Arduino/ArduinoGpioControllerDriver.cs @@ -14,26 +14,28 @@ namespace Iot.Device.Arduino { internal class ArduinoGpioControllerDriver : GpioDriver { - private readonly ArduinoBoard _arduinoBoard; + private readonly FirmataDevice _device; private readonly IReadOnlyCollection _supportedPinConfigurations; private readonly Dictionary _callbackContainers; private readonly ConcurrentDictionary _pinModes; private readonly object _callbackContainersLock; private readonly AutoResetEvent _waitForEventResetEvent; private readonly ILogger _logger; + private readonly ConcurrentDictionary _outputPinValues; - internal ArduinoGpioControllerDriver(ArduinoBoard arduinoBoard, IReadOnlyCollection supportedPinConfigurations) + internal ArduinoGpioControllerDriver(FirmataDevice device, IReadOnlyCollection supportedPinConfigurations) { - _arduinoBoard = arduinoBoard ?? throw new ArgumentNullException(nameof(arduinoBoard)); + _device = device ?? throw new ArgumentNullException(nameof(device)); _supportedPinConfigurations = supportedPinConfigurations ?? throw new ArgumentNullException(nameof(supportedPinConfigurations)); _callbackContainers = new Dictionary(); _waitForEventResetEvent = new AutoResetEvent(false); _callbackContainersLock = new object(); _pinModes = new ConcurrentDictionary(); + _outputPinValues = new ConcurrentDictionary(); _logger = this.GetCurrentClassLogger(); PinCount = _supportedPinConfigurations.Count; - _arduinoBoard.Firmata.DigitalPortValueUpdated += FirmataOnDigitalPortValueUpdated; + _device.DigitalPortValueUpdated += FirmataOnDigitalPortValueUpdated; } protected override int PinCount { get; } @@ -48,6 +50,10 @@ namespace Iot.Device.Arduino protected override void OpenPin(int pinNumber) { + if (pinNumber < 0 || pinNumber >= PinCount) + { + throw new ArgumentOutOfRangeException(nameof(pinNumber), $"Pin {pinNumber} is not valid"); + } } protected override void ClosePin(int pinNumber) @@ -65,9 +71,11 @@ namespace Iot.Device.Arduino break; case PinMode.InputPullUp: firmataMode = SupportedMode.InputPullup; + _outputPinValues.TryRemove(pinNumber, out _); break; case PinMode.Input: firmataMode = SupportedMode.DigitalInput; + _outputPinValues.TryRemove(pinNumber, out _); break; default: _logger.LogError($"Mode {mode} is not supported as argument to {nameof(SetPinMode)}"); @@ -81,7 +89,7 @@ namespace Iot.Device.Arduino throw new NotSupportedException($"Mode {mode} is not supported on Pin {pinNumber}."); } - _arduinoBoard.Firmata.SetPinMode(pinNumber, firmataMode); + _device.SetPinMode(pinNumber, firmataMode); // Cache this value. Since the GpioController calls GetPinMode when trying to write a pin (to verify whether it is set to output), // that would be very expensive here. And setting output pins should be cheap. @@ -95,7 +103,7 @@ namespace Iot.Device.Arduino return existingValue; } - var mode = _arduinoBoard.Firmata.GetPinMode(pinNumber); + var mode = _device.GetPinMode(pinNumber); PinMode ret; if (mode == SupportedMode.DigitalOutput.Value) @@ -144,12 +152,22 @@ namespace Iot.Device.Arduino protected override PinValue Read(int pinNumber) { - return _arduinoBoard.Firmata.ReadDigitalPin(pinNumber); + return _device.ReadDigitalPin(pinNumber); } protected override void Write(int pinNumber, PinValue value) { - _arduinoBoard.Firmata.WriteDigitalPin(pinNumber, value); + if (_outputPinValues.TryGetValue(pinNumber, out PinValue? existingValue) && existingValue != null) + { + // If this output value is already present, don't send a message. + if (value == existingValue.Value) + { + return; + } + } + + _device.WriteDigitalPin(pinNumber, value); + _outputPinValues.AddOrUpdate(pinNumber, x => value, (y, z) => value); } protected override WaitForEventResult WaitForEvent(int pinNumber, PinEventTypes eventTypes, CancellationToken cancellationToken) @@ -173,7 +191,7 @@ namespace Iot.Device.Arduino } } - _arduinoBoard.Firmata.DigitalPortValueUpdated += WaitForEventPortValueUpdated; + _device.DigitalPortValueUpdated += WaitForEventPortValueUpdated; try { WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, _waitForEventResetEvent }); @@ -188,7 +206,7 @@ namespace Iot.Device.Arduino } finally { - _arduinoBoard.Firmata.DigitalPortValueUpdated -= WaitForEventPortValueUpdated; + _device.DigitalPortValueUpdated -= WaitForEventPortValueUpdated; } return new WaitForEventResult() @@ -265,7 +283,8 @@ namespace Iot.Device.Arduino _callbackContainers.Clear(); } - _arduinoBoard.Firmata.DigitalPortValueUpdated -= FirmataOnDigitalPortValueUpdated; + _outputPinValues.Clear(); + _device.DigitalPortValueUpdated -= FirmataOnDigitalPortValueUpdated; } base.Dispose(disposing); diff --git a/src/devices/Arduino/ArduinoI2cBus.cs b/src/devices/Arduino/ArduinoI2cBus.cs index cb2595d9f14ccf716c19ffa71c731fa3d7464202..782cc0ea3b22230703f63864c5f7815bd31e74c7 100644 --- a/src/devices/Arduino/ArduinoI2cBus.cs +++ b/src/devices/Arduino/ArduinoI2cBus.cs @@ -15,6 +15,7 @@ namespace Iot.Device.Arduino private readonly ArduinoBoard _board; private readonly int _busId; private readonly HashSet _usedAddresses; + private bool _busInitialized; public ArduinoI2cBus(ArduinoBoard board, int busId) { @@ -31,6 +32,18 @@ namespace Iot.Device.Arduino throw new InvalidOperationException($"Device number {deviceAddress} is already in use"); } + if (!_busInitialized) + { + // Ensure the corresponding pins are set to I2C (not strictly necessary, but consistent) + foreach (SupportedPinConfiguration supportedPinConfiguration in _board.SupportedPinConfigurations.Where(x => x.PinModes.Contains(SupportedMode.I2c))) + { + _board.Firmata.SetPinMode(supportedPinConfiguration.Pin, SupportedMode.I2c); + } + + _board.Firmata.SendI2cConfigCommand(); + _busInitialized = true; + } + var device = new ArduinoI2cDevice(_board, this, new I2cConnectionSettings(_busId, deviceAddress)); _usedAddresses.Add(deviceAddress); return device; diff --git a/src/devices/Arduino/ArduinoSpiDevice.cs b/src/devices/Arduino/ArduinoSpiDevice.cs index 80d2aaf3b21850ed99aef8fd1d81bd63b75beaa7..d138954cc5fec32ac9601446b58609c52ad070bf 100644 --- a/src/devices/Arduino/ArduinoSpiDevice.cs +++ b/src/devices/Arduino/ArduinoSpiDevice.cs @@ -50,7 +50,7 @@ namespace Iot.Device.Arduino public override void Write(ReadOnlySpan buffer) { - Board.Firmata.SpiWrite(ConnectionSettings.ChipSelectLine, buffer); + Board.Firmata.SpiWrite(ConnectionSettings.ChipSelectLine, buffer, !Board.StreamUsesHardwareFlowControl); } public override void TransferFullDuplex(ReadOnlySpan writeBuffer, Span readBuffer) diff --git a/src/devices/Arduino/BlockingConcurrentBag.cs b/src/devices/Arduino/BlockingConcurrentBag.cs new file mode 100644 index 0000000000000000000000000000000000000000..af27c2fc6b8295faaafe24b494494ed43055a127 --- /dev/null +++ b/src/devices/Arduino/BlockingConcurrentBag.cs @@ -0,0 +1,112 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Iot.Device.Arduino +{ + /// + /// Represents a collection that removes objects based on a certain pattern + /// + internal class BlockingConcurrentBag + { + private readonly object _lock = new object(); + private readonly List _container = new List(); + + public int Count + { + get + { + lock (_lock) + { + return _container.Count; + } + } + } + + public void Add(T? elem) + { + lock (_lock) + { + _container.Add(elem); + Monitor.PulseAll(_lock); + } + } + + public void Clear() + { + lock (_lock) + { + _container.Clear(); + } + } + + /// + /// Waits until an element is in the queue that matches the given predicate. + /// Checking the predicate should be fast. + /// + /// The predicate to test + /// The overall timeout + /// Returns the element found, if any + /// True if an element was found within the timeout, false otherwise + public bool TryRemoveElement(Func predicate, TimeSpan timeout, out T? element) + { + bool lockTaken = false; + Stopwatch sw = Stopwatch.StartNew(); + element = default; + try + { + Monitor.TryEnter(_lock, timeout, ref lockTaken); + if (lockTaken) + { + // The critical section. + while (true) + { + // Cannot use FirstOrDefault here, because we need to be able to distinguish between + // finding nothing and finding an empty (null, default) element + for (var index = 0; index < _container.Count; index++) + { + T? elem = _container[index]; + if (predicate(elem)) + { + _container.RemoveAt(index); + element = elem; + return true; + } + } + + if (sw.Elapsed > timeout) + { + return false; + } + + TimeSpan remaining = timeout - sw.Elapsed; + if (remaining <= TimeSpan.Zero || !Monitor.Wait(_lock, remaining)) + { + return false; + } + } + } + else + { + return false; + } + } + finally + { + // Ensure that the lock is released. + if (lockTaken) + { + Monitor.Exit(_lock); + } + } + } + } +} diff --git a/src/devices/Arduino/Encoder7Bit.cs b/src/devices/Arduino/Encoder7Bit.cs new file mode 100644 index 0000000000000000000000000000000000000000..a6f16ff2fcd2f5a477675204b976fa5009a9df9e --- /dev/null +++ b/src/devices/Arduino/Encoder7Bit.cs @@ -0,0 +1,113 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Iot.Device.Arduino +{ + /// + /// This class is used to encode larger chunks of data for transmission using the Firmata protocol. + /// It converts each block of 7 bytes into a block of 8 bytes, keeping the top bit 0. + /// + public static class Encoder7Bit + { + /// + /// Calculates the number of bytes generated during decode (the result is smaller than the input) + /// + public static int Num8BitOutBytes(int inputBytes) + { + // Equals * 7 / 8 + return (int)Math.Floor(((inputBytes) * 7) / 8.0); + } + + /// + /// Calculates the number of bytes required for the 7-byte encoding + /// + public static int Num7BitOutBytes(int inputBytes) + { + return (int)Math.Ceiling(((inputBytes) * 8.0) / 7); + } + + /// + /// Encode a sequence of bytes + /// + /// The data to encode + /// The encoded data + public static byte[] Encode(ReadOnlySpan data) + { + return Encode(data, 0, data.Length); + } + + /// + /// Encodes a sequence of bytes + /// + /// The data to encode + /// The start index in the data + /// The length of the data + /// The encoded data + public static byte[] Encode(ReadOnlySpan data, int startIndex, int length) + { + int shift = 0; + byte[] retBytes = new byte[Num7BitOutBytes(length)]; + int index = 0; + int previous = 0; + for (int i = startIndex; i < startIndex + length; i++) + { + if (shift == 0) + { + retBytes[index] = (byte)(data[i] & 0x7f); + index++; + shift++; + previous = data[i] >> 7; + } + else + { + retBytes[index] = (byte)(((data[i] << shift) & 0x7f) | previous); + index++; + if (shift == 6) + { + retBytes[index] = (byte)(data[i] >> 1); + index++; + shift = 0; + } + else + { + shift++; + previous = data[i] >> (8 - shift); + } + } + } + + if (shift > 0) + { + // Write remainder + retBytes[index] = (byte)previous; + } + + return retBytes; + } + + /// + /// Decodes the given data sequence + /// + /// The data to decode + /// The decoded data + public static byte[] Decode(ReadOnlySpan inData) + { + byte[] outBytes = new byte[Num8BitOutBytes(inData.Length)]; + for (int i = 0; i < outBytes.Length; i++) + { + int j = i << 3; + int pos = j / 7; + int shift = j % 7; + outBytes[i] = (byte)((inData[pos] >> shift) | ((inData[pos + 1] << (7 - shift)) & 0xFF)); + } + + return outBytes; + } +} +} diff --git a/src/devices/Arduino/ExtendedCommandHandler.cs b/src/devices/Arduino/ExtendedCommandHandler.cs index 4364f7299b30be19053b1c12a00f6bc50266a1e4..4afb8bdb23fefe7be9efcf2e40abf4179ce19b63 100644 --- a/src/devices/Arduino/ExtendedCommandHandler.cs +++ b/src/devices/Arduino/ExtendedCommandHandler.cs @@ -62,13 +62,7 @@ namespace Iot.Device.Arduino /// This might need to be checked in Dispose, to make sure an uninitialized component doesn't attempt /// to send a command. /// - protected bool IsRegistered - { - get - { - return _board != null; - } - } + protected bool IsRegistered => _board != null; /// /// The reference to the arduino board @@ -135,25 +129,39 @@ namespace Iot.Device.Arduino throw new InvalidOperationException("Command handler not registered"); } - return _firmata.SendCommandAndWait(commandSequence, timeout, out error); + return _firmata.SendCommandAndWait(commandSequence, timeout, IsMatchingAck, out error); } /// /// Send a command to the device, expecting a reply. /// - /// Command to send. This + /// Commands to send. This /// should normally be a sysex command. /// Command timeout + /// An error code in case of a failure /// The timeout elapsed before a reply was received. - /// The reply packet - protected byte[] SendCommandAndWait(FirmataCommandSequence commandSequence, TimeSpan timeout) + /// True if all packets where send and properly acknowledged + protected bool SendCommandsAndWait(IList commandSequences, TimeSpan timeout, out CommandError error) { if (_firmata == null) { throw new InvalidOperationException("Command handler not registered"); } - return _firmata.SendCommandAndWait(commandSequence, timeout); + return _firmata.SendCommandsAndWait(commandSequences, timeout, IsMatchingAck, HasCommandError, out error); + } + + /// + /// Send a command to the device, expecting a reply. + /// + /// Command to send. This + /// should normally be a sysex command. + /// Command timeout + /// The timeout elapsed before a reply was received. + /// The reply packet + protected byte[] SendCommandAndWait(FirmataCommandSequence commandSequence, TimeSpan timeout) + { + return SendCommandAndWait(commandSequence, timeout, out _); } /// @@ -171,16 +179,35 @@ namespace Iot.Device.Arduino /// /// This is called when a sysex command is received from the board. /// This can include the reply to a command sent by a before, in which case - /// the reply should be ignored, as it is returned as result of the call itself. + /// the reply should be ignored, as it is returned as result of the call itself. Therefore it is advised to use this function only + /// to listen for data sent by the device automatically (e.g event messages or recurring status reports) /// /// Type of data received from the hardware. This should normally be , /// unless the hardware sends unencoded Ascii messages /// The binary representation of the received data - /// The implementation needs to check whether the data is for itself. The messages are not filtered by requester! + /// The implementation needs to check the type and source of the data. The messages are not filtered by requester! protected virtual void OnSysexData(ReplyType type, byte[] data) { } + /// + /// This method is called to check whether the reply is a valid ACK/NOACK for the given command sequence. + /// Can be used to avoid accepting something as command reply that is completely unrelated (such as an asynchronous callback). + /// In different words, this should return false if the given reply is not something that is an answer to a synchronous command. + /// + /// The sequence that was sent + /// The reply + /// True if this reply matches the sequence. True is the default, for backwards compatibility + protected virtual bool IsMatchingAck(FirmataCommandSequence sequence, byte[] reply) => true; + + /// + /// Callback function that returns whether the given reply indicates an error + /// + /// The original sequence + /// The reply. is already tested to be true for this reply + /// A command error code, in case this reply indicates a no-acknowledge + protected virtual CommandError HasCommandError(FirmataCommandSequence sequence, byte[] reply) => CommandError.None; + private void OnSysexDataInternal(ReplyType type, byte[] data) { if (_firmata == null) @@ -205,6 +232,16 @@ namespace Iot.Device.Arduino _board = null; } + /// + /// Called by the infrastructure when the parser reports an error or information message. + /// The default implementation does nothing. + /// + /// The message text + /// The exception observed (may be null) + protected internal virtual void OnErrorMessage(string message, Exception? exception) + { + } + /// public void Dispose() { diff --git a/src/devices/Arduino/FirmataCommandSequence.cs b/src/devices/Arduino/FirmataCommandSequence.cs index 71d35f5c1d0bf2bbd03aaa052a1171e0c373ee61..9a2185007f36130e202f74533e64a44944163b0a 100644 --- a/src/devices/Arduino/FirmataCommandSequence.cs +++ b/src/devices/Arduino/FirmataCommandSequence.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; namespace Iot.Device.Arduino @@ -12,8 +13,10 @@ namespace Iot.Device.Arduino /// A firmata command sequence /// Intended to be changed to public visibility later /// - public class FirmataCommandSequence + public class FirmataCommandSequence : IEquatable { + private const int InitialCommandLength = 32; + /// /// Start of sysex command byte. Used as start byte for almost all extended commands. /// @@ -56,6 +59,8 @@ namespace Iot.Device.Arduino /// public int Length => _sequence.Count; + internal byte[] InternalSequence => _sequence.ToArray(); + /// /// Decode an uint from packed 7-bit data. /// This way of encoding uints is only used in extension modules. @@ -95,6 +100,17 @@ namespace Iot.Device.Arduino return (Int32)DecodeUInt32(data, fromOffset); } + /// + /// Decodes a 14-bit integer into a short + /// + /// Data array + /// Start offset + /// + public static short DecodeInt14(byte[] data, int idx) + { + return (short)(data[idx] | data[idx + 1] << 7); + } + /// /// Send an Uint32 as 5 x 7 bits. This form of transmitting integers is only supported by extension modules /// @@ -178,5 +194,70 @@ namespace Iot.Device.Arduino _sequence.Add((byte)(values[i] >> 7 & sbyte.MaxValue)); } } + + /// + public override string ToString() + { + StringBuilder b = new StringBuilder(); + + int maxBytes = Math.Min(Length, 32); + for (int i = 0; i < maxBytes; i++) + { + b.Append($"{_sequence[i]:X2} "); + } + + if (maxBytes < Length) + { + b.Append("..."); + } + + return b.ToString(); + } + + /// + public bool Equals(FirmataCommandSequence? other) + { + if (ReferenceEquals(null, other)) + { + return false; + } + + if (ReferenceEquals(this, other)) + { + return true; + } + + return _sequence.Equals(other._sequence) && Length == other.Length; + } + + /// + public override bool Equals(object? obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((FirmataCommandSequence)obj); + } + + /// + public override int GetHashCode() + { + unchecked + { + return (_sequence.GetHashCode() * 397); + } + } } } diff --git a/src/devices/Arduino/FirmataDevice.cs b/src/devices/Arduino/FirmataDevice.cs index 19437eba586ea8db74d4528dcc300ed1b7d81776..b5dca4d53a2782cd81f1f0b9aa06b7db652c3801 100644 --- a/src/devices/Arduino/FirmataDevice.cs +++ b/src/devices/Arduino/FirmataDevice.cs @@ -3,6 +3,7 @@ using System; using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Device.Gpio; using System.Device.Spi; @@ -13,6 +14,8 @@ using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; +using Iot.Device.Common; +using Microsoft.Extensions.Logging; using UnitsNet; namespace Iot.Device.Arduino @@ -27,21 +30,22 @@ namespace Iot.Device.Arduino private const byte FIRMATA_PROTOCOL_MAJOR_VERSION = 2; private const byte FIRMATA_PROTOCOL_MINOR_VERSION = 5; // 2.5 works, but 2.6 is recommended private const int FIRMATA_INIT_TIMEOUT_SECONDS = 2; - internal static readonly TimeSpan DefaultReplyTimeout = TimeSpan.FromMilliseconds(500); + internal static readonly TimeSpan DefaultReplyTimeout = TimeSpan.FromMilliseconds(1000); private byte _firmwareVersionMajor; private byte _firmwareVersionMinor; private byte _actualFirmataProtocolMajorVersion; private byte _actualFirmataProtocolMinorVersion; + private Version _firmwareVersion; + private int _lastRequestId; private string _firmwareName; private Stream? _firmataStream; private Thread? _inputThread; - private bool _inputThreadShouldExit; private List _supportedPinConfigurations; - private IList _lastResponse; + private BlockingConcurrentBag _pendingResponses; private List _lastPinValues; private Dictionary _lastAnalogValues; private object _lastPinValueLock; @@ -52,11 +56,15 @@ namespace Iot.Device.Arduino private CommandError _lastCommandError; + private int _i2cSequence; + /// /// Event used when waiting for answers (i.e. after requesting firmware version) /// private AutoResetEvent _dataReceived; + private ILogger _logger; + public event PinChangeEventHandler? DigitalPortValueUpdated; public event AnalogPinValueUpdated? AnalogPinValueUpdated; @@ -65,12 +73,15 @@ namespace Iot.Device.Arduino public event Action? OnSysexReply; + private long _bytesTransmitted = 0; + public FirmataDevice(List supportedModes) { _firmwareVersionMajor = 0; _firmwareVersionMinor = 0; + _firmwareVersion = new Version(0, 0); _firmataStream = null; - _inputThreadShouldExit = false; + InputThreadShouldExit = false; _dataReceived = new AutoResetEvent(false); _supportedPinConfigurations = new List(); _synchronisationLock = new object(); @@ -78,13 +89,15 @@ namespace Iot.Device.Arduino _lastPinValueLock = new object(); _lastAnalogValues = new Dictionary(); _lastAnalogValueLock = new object(); - _dataQueue = new Queue(); - _lastResponse = new List(); + _dataQueue = new Queue(1024); + _pendingResponses = new BlockingConcurrentBag(); _lastRequestId = 1; _lastCommandError = CommandError.None; _firmwareName = string.Empty; _lastRawLine = new StringBuilder(); SupportedModes = supportedModes; + _i2cSequence = 0; + _logger = this.GetCurrentClassLogger(); } internal List PinConfigurations @@ -97,6 +110,10 @@ namespace Iot.Device.Arduino internal List SupportedModes { get; set; } + internal long BytesTransmitted => _bytesTransmitted; + + internal bool InputThreadShouldExit { get; set; } + public void Open(Stream stream) { lock (_synchronisationLock) @@ -130,13 +147,11 @@ namespace Iot.Device.Arduino return; } - _inputThreadShouldExit = false; + InputThreadShouldExit = false; _inputThread = new Thread(InputThread); + _inputThread.Name = "Firmata input thread"; _inputThread.Start(); - - // Reset device, in case it is still sending data from an aborted process - _firmataStream.WriteByte((byte)FirmataCommand.SYSTEM_RESET); } private void ProcessInput() @@ -284,6 +299,7 @@ namespace Iot.Device.Arduino _actualFirmataProtocolMajorVersion = message[0]; _actualFirmataProtocolMinorVersion = message[1]; + _logger.LogInformation($"Received protocol version: {_actualFirmataProtocolMajorVersion}.{_actualFirmataProtocolMinorVersion}."); _dataReceived.Set(); return; @@ -311,6 +327,12 @@ namespace Iot.Device.Arduino { int offset = lower_nibble * 8; ushort pinValues = (ushort)(message[0] | (message[1] << 7)); + if (offset + 7 >= _lastPinValues.Count) + { + _logger.LogError($"Firmware reported an update for port {lower_nibble}, but there are only {_supportedPinConfigurations.Count} pins"); + break; + } + lock (_lastPinValueLock) { for (int i = 0; i < 8; i++) @@ -354,11 +376,13 @@ namespace Iot.Device.Arduino { _firmwareVersionMajor = raw_data[1]; _firmwareVersionMinor = raw_data[2]; + _firmwareVersion = new Version(_firmwareVersionMajor, _firmwareVersionMinor); int stringLength = (raw_data.Length - 3) / 2; Span bytesReceived = stackalloc byte[stringLength]; ReassembleByteString(raw_data, 3, stringLength * 2, bytesReceived); _firmwareName = Encoding.ASCII.GetString(bytesReceived); + _logger.LogDebug($"Received Firmware name {_firmwareName}"); _dataReceived.Set(); } @@ -461,22 +485,19 @@ namespace Iot.Device.Arduino break; case FirmataSysexCommand.I2C_REPLY: _lastCommandError = CommandError.None; - _lastResponse = raw_data; - _dataReceived.Set(); + _pendingResponses.Add(raw_data); break; case FirmataSysexCommand.SPI_DATA: _lastCommandError = CommandError.None; - _lastResponse = raw_data; - _dataReceived.Set(); + _pendingResponses.Add(raw_data); break; default: // we pass the data forward as-is for any other type of sysex command _lastCommandError = CommandError.None; - _lastResponse = raw_data; // the instance is constant, so we can just remember the pointer + _pendingResponses.Add(raw_data); OnSysexReply?.Invoke(ReplyType.SysexCommand, raw_data); - _dataReceived.Set(); break; } @@ -504,78 +525,122 @@ namespace Iot.Device.Arduino throw new ObjectDisposedException(nameof(FirmataDevice)); } - // Use an explicit iteration, avoids a memory allocation here - for (int i = 0; i < sequence.Sequence.Count; i++) - { - _firmataStream.WriteByte(sequence.Sequence[i]); - } - + _firmataStream.Write(sequence.Sequence.ToArray()); + _bytesTransmitted += sequence.Sequence.Count; _firmataStream.Flush(); } } - /// - /// Send a command and wait for a reply - /// - /// The command sequence, typically starting with and ending with - /// The raw sequence of sysex reply bytes. The reply does not include the START_SYSEX byte, but it does include the terminating END_SYSEX byte. The first byte is the - /// command number of the corresponding request - public byte[] SendCommandAndWait(FirmataCommandSequence sequence) - { - return SendCommandAndWait(sequence, DefaultReplyTimeout); - } - /// /// Send a command and wait for a reply /// /// The command sequence, typically starting with and ending with /// A non-default timeout + /// A callback function that should return true if the given reply is the one this command should wait for. The default is true, because asynchronous replies + /// are rather the exception than the rule + /// An error code in case of failure /// The raw sequence of sysex reply bytes. The reply does not include the START_SYSEX byte, but it does include the terminating END_SYSEX byte. The first byte is the /// command number of the corresponding request - public byte[] SendCommandAndWait(FirmataCommandSequence sequence, TimeSpan timeout) + public byte[] SendCommandAndWait(FirmataCommandSequence sequence, TimeSpan timeout, Func isMatchingAck, out CommandError error) { - return SendCommandAndWait(sequence, timeout, out _); + if (!sequence.Validate()) + { + throw new ArgumentException("The command sequence is invalid", nameof(sequence)); + } + + if (_firmataStream == null) + { + throw new ObjectDisposedException(nameof(FirmataDevice)); + } + + _firmataStream.Write(sequence.Sequence.ToArray(), 0, sequence.Sequence.Count); + _bytesTransmitted += sequence.Sequence.Count; + _firmataStream.Flush(); + + byte[]? response; + if (!_pendingResponses.TryRemoveElement(x => isMatchingAck(sequence, x!), timeout, out response)) + { + throw new TimeoutException("Timeout waiting for command answer"); + } + + error = _lastCommandError; + return response ?? throw new InvalidOperationException("Got a null reply"); // should not happen in our case } /// /// Send a command and wait for a reply /// - /// The command sequence, typically starting with and ending with + /// The command sequences to send, typically starting with and ending with /// A non-default timeout + /// A callback function that should return true if the given reply is the one this command should wait for. The default is true, because asynchronous replies + /// are rather the exception than the rule + /// A callback that determines a possible error in the reply message /// An error code in case of failure /// The raw sequence of sysex reply bytes. The reply does not include the START_SYSEX byte, but it does include the terminating END_SYSEX byte. The first byte is the /// command number of the corresponding request - public byte[] SendCommandAndWait(FirmataCommandSequence sequence, TimeSpan timeout, out CommandError error) + public bool SendCommandsAndWait(IList sequences, TimeSpan timeout, Func isMatchingAck, + Func errorFunc, out CommandError error) { - if (!sequence.Validate()) + if (sequences.Any(s => s.Validate() == false)) { - throw new ArgumentException("The command sequence is invalid", nameof(sequence)); + throw new ArgumentException("At least one command sequence is invalid", nameof(sequences)); } - lock (_synchronisationLock) + if (sequences.Count > 127) { - if (_firmataStream == null) - { - throw new ObjectDisposedException(nameof(FirmataDevice)); - } + // Because we only have 7 bits for the sequence counter. + throw new ArgumentException("At most 127 sequences can be chained together", nameof(sequences)); + } - _dataReceived.Reset(); - // Use an explicit iteration, avoids a memory allocation here - for (int i = 0; i < sequence.Sequence.Count; i++) - { - _firmataStream.WriteByte(sequence.Sequence[i]); - } + if (isMatchingAck == null) + { + throw new ArgumentNullException(nameof(isMatchingAck)); + } - _firmataStream.Flush(); - bool result = _dataReceived.WaitOne(timeout); - if (result == false) + error = CommandError.None; + if (_firmataStream == null) + { + throw new ObjectDisposedException(nameof(FirmataDevice)); + } + + Dictionary sequencesWithAck = new(); + foreach (var s in sequences) + { + sequencesWithAck.Add(s, false); + _firmataStream.Write(s.InternalSequence, 0, s.Length); + } + + _firmataStream.Flush(); + + byte[]? response; + do + { + foreach (var s2 in sequencesWithAck) { - throw new TimeoutException("Timeout waiting for command answer"); - } + if (_pendingResponses.TryRemoveElement(x => isMatchingAck(s2.Key, x!), timeout, out response)) + { + CommandError e = CommandError.None; + if (response == null) + { + error = CommandError.Aborted; + } + else if (_lastCommandError != CommandError.None) + { + error = _lastCommandError; + } + else if ((e = errorFunc(s2.Key, response)) != CommandError.None) + { + error = e; + } - error = _lastCommandError; - return _lastResponse.ToArray(); + sequencesWithAck[s2.Key] = true; + break; + } + } } + while (sequencesWithAck.Any(x => x.Value == false)); + + return sequencesWithAck.All(x => x.Value); } /// @@ -656,7 +721,7 @@ namespace Iot.Device.Arduino throw new ObjectDisposedException(nameof(FirmataDevice)); } - Span rawData = stackalloc byte[100]; + Span rawData = stackalloc byte[512]; int bytesRead = _firmataStream.Read(rawData); for (int i = 0; i < bytesRead; i++) @@ -669,7 +734,7 @@ namespace Iot.Device.Arduino private void InputThread() { - while (!_inputThreadShouldExit) + while (!InputThreadShouldExit) { try { @@ -677,7 +742,12 @@ namespace Iot.Device.Arduino } catch (Exception ex) { - OnError?.Invoke($"Firmata protocol error: Parser exception {ex.Message}", ex); + // If the exception happens because the stream was closed, don't print an error + if (!InputThreadShouldExit) + { + _logger.LogError(ex, $"Error in parser: {ex.Message}"); + OnError?.Invoke($"Firmata protocol error: Parser exception {ex.Message}", ex); + } } } } @@ -792,7 +862,7 @@ namespace Iot.Device.Arduino private void StopThread() { - _inputThreadShouldExit = true; + InputThreadShouldExit = true; if (_inputThread != null) { _inputThread.Join(); @@ -848,7 +918,10 @@ namespace Iot.Device.Arduino return PerformRetries(3, () => { - var response = SendCommandAndWait(getPinModeSequence); + var response = SendCommandAndWait(getPinModeSequence, DefaultReplyTimeout, (sequence, bytes) => + { + return bytes.Length >= 4 && bytes[1] == pinNumber; + }, out _); // The mode is byte 4 if (response.Length < 4) @@ -929,11 +1002,14 @@ namespace Iot.Device.Arduino { i2cSequence.WriteByte((byte)FirmataSysexCommand.I2C_REQUEST); i2cSequence.WriteByte((byte)slaveAddress); - i2cSequence.WriteByte(0); // Write flag is 0, all other bits as well + // Write flag is 0, all other bits normally, too. + i2cSequence.WriteByte(0); i2cSequence.WriteBytesAsTwo7bitBytes(writeData); i2cSequence.WriteByte((byte)FirmataCommand.END_SYSEX); } + int sequenceNo = (_i2cSequence++) & 0b111; + if (replyData != null && replyData.Length > 0) { doWait = true; @@ -945,7 +1021,9 @@ namespace Iot.Device.Arduino i2cSequence.WriteByte((byte)FirmataSysexCommand.I2C_REQUEST); i2cSequence.WriteByte((byte)slaveAddress); - i2cSequence.WriteByte(0b1000); // Read flag is 1, all other bits are 0 + + // Read flag is 1, all other bits are 0. We use bits 0-2 (slave address MSB, unused in 7 bit mode) as sequence id. + i2cSequence.WriteByte((byte)(0b1000 | sequenceNo)); byte length = (byte)replyData.Length; // Only write the length of the expected data. // We could insert the register to read here, but we assume that has been written already (the client is responsible for that) @@ -956,7 +1034,25 @@ namespace Iot.Device.Arduino if (doWait) { - var response = SendCommandAndWait(i2cSequence); + var response = SendCommandAndWait(i2cSequence, TimeSpan.FromSeconds(3), (sequence, bytes) => + { + if (bytes.Length < 5) + { + return false; + } + + if (bytes[0] != (byte)FirmataSysexCommand.I2C_REPLY) + { + return false; + } + + if ((bytes[2] & 0b111) != sequenceNo) + { + return false; + } + + return true; + }, out _); if (response[0] != (byte)FirmataSysexCommand.I2C_REPLY) { @@ -1053,17 +1149,71 @@ namespace Iot.Device.Arduino SendCommand(disableSpi); } - public void SpiWrite(int csPin, ReadOnlySpan writeBytes) + public void SpiWrite(int csPin, ReadOnlySpan writeBytes, bool waitForReply = false) { // When the command is SPI_WRITE, the device answer is already discarded in the firmware. - var command = SpiWrite(csPin, FirmataSpiCommand.SPI_WRITE, writeBytes, out _); - SendCommand(command); + if (waitForReply) + { + var command = SpiWrite(csPin, FirmataSpiCommand.SPI_WRITE_ACK, writeBytes, out byte requestId); + var response = SendCommandAndWait(command, DefaultReplyTimeout, (sequence, bytes) => + { + if (bytes.Length < 5) + { + return false; + } + + if (bytes[0] != (byte)FirmataSysexCommand.SPI_DATA || bytes[1] != (byte)FirmataSpiCommand.SPI_REPLY) + { + return false; + } + + if (bytes[3] != (byte)requestId) + { + return false; + } + + return true; + }, out _lastCommandError); + + if (response[0] != (byte)FirmataSysexCommand.SPI_DATA || response[1] != (byte)FirmataSpiCommand.SPI_REPLY) + { + throw new IOException("Firmata protocol error: received incorrect query response"); + } + + if (response[3] != (byte)requestId) + { + throw new IOException($"Firmata protocol sequence error."); + } + } + else + { + var command = SpiWrite(csPin, FirmataSpiCommand.SPI_WRITE, writeBytes, out _); + SendCommand(command); + } } public void SpiTransfer(int csPin, ReadOnlySpan writeBytes, Span readBytes) { var command = SpiWrite(csPin, FirmataSpiCommand.SPI_TRANSFER, writeBytes, out byte requestId); - var response = SendCommandAndWait(command); + var response = SendCommandAndWait(command, DefaultReplyTimeout, (sequence, bytes) => + { + if (bytes.Length < 5) + { + return false; + } + + if (bytes[0] != (byte)FirmataSysexCommand.SPI_DATA || bytes[1] != (byte)FirmataSpiCommand.SPI_REPLY) + { + return false; + } + + if (bytes[3] != (byte)requestId) + { + return false; + } + + return true; + }, out _lastCommandError); if (response[0] != (byte)FirmataSysexCommand.SPI_DATA || response[1] != (byte)FirmataSpiCommand.SPI_REPLY) { @@ -1089,7 +1239,7 @@ namespace Iot.Device.Arduino spiCommand.WriteByte(requestId); spiCommand.WriteByte(1); // Deselect CS after transfer (yes) spiCommand.WriteByte((byte)writeBytes.Length); - spiCommand.WriteBytesAsTwo7bitBytes(writeBytes); + spiCommand.Write(Encoder7Bit.Encode(writeBytes)); spiCommand.WriteByte((byte)FirmataCommand.END_SYSEX); return spiCommand; } @@ -1114,18 +1264,36 @@ namespace Iot.Device.Arduino throw new NotSupportedException("Only pins <=15 are allowed as CS line"); } + if (_firmwareVersion <= new Version(2, 11)) + { + // we could leverage this, if needed, by using the older data encoding + throw new NotSupportedException("This library requires firmware version 2.12 or later for SPI transfers"); + } + + int deviceId = connectionSettings.ChipSelectLine; FirmataCommandSequence spiConfigSequence = new(); spiConfigSequence.WriteByte((byte)FirmataSysexCommand.SPI_DATA); spiConfigSequence.WriteByte((byte)FirmataSpiCommand.SPI_DEVICE_CONFIG); - byte deviceIdChannel = (byte)(connectionSettings.ChipSelectLine << 3); + byte deviceIdChannel = (byte)(deviceId << 3 | (connectionSettings.BusId & 0x7)); spiConfigSequence.WriteByte((byte)(deviceIdChannel)); - spiConfigSequence.WriteByte((byte)1); - int clockSpeed = 1_000_000; // Hz - spiConfigSequence.WriteByte((byte)(clockSpeed & 0x7F)); - spiConfigSequence.WriteByte((byte)((clockSpeed >> 7) & 0x7F)); - spiConfigSequence.WriteByte((byte)((clockSpeed >> 15) & 0x7F)); - spiConfigSequence.WriteByte((byte)((clockSpeed >> 22) & 0x7F)); - spiConfigSequence.WriteByte((byte)((clockSpeed >> 29) & 0x7F)); + int dataMode = 0; + if (connectionSettings.DataFlow == DataFlow.MsbFirst) + { + dataMode = 1; + } + + int mode = ((int)connectionSettings.Mode) << 1; + dataMode |= mode; + dataMode |= 0x8; // Use fast transfer mode + + spiConfigSequence.WriteByte((byte)dataMode); + int clockSpeed = connectionSettings.ClockFrequency; + if (clockSpeed <= 0) + { + clockSpeed = 1_000_000; + } + + spiConfigSequence.SendInt32(clockSpeed); spiConfigSequence.WriteByte(0); // Word size (default = 8) spiConfigSequence.WriteByte(1); // Default CS pin control (enable) spiConfigSequence.WriteByte((byte)(connectionSettings.ChipSelectLine)); @@ -1166,7 +1334,7 @@ namespace Iot.Device.Arduino { if (disposing) { - _inputThreadShouldExit = true; + InputThreadShouldExit = true; lock (_synchronisationLock) { @@ -1195,5 +1363,13 @@ namespace Iot.Device.Arduino Dispose(true); GC.SuppressFinalize(this); } + + public void SendSoftwareReset() + { + lock (_synchronisationLock) + { + _firmataStream?.WriteByte(0xFF); + } + } } } diff --git a/src/devices/Arduino/FirmataSpiCommand.cs b/src/devices/Arduino/FirmataSpiCommand.cs index 8a3e1e7a23fdde0131943526b0006730055e1ff8..075c51ac48fc4a22f769d5c8ba307dfe69692327 100644 --- a/src/devices/Arduino/FirmataSpiCommand.cs +++ b/src/devices/Arduino/FirmataSpiCommand.cs @@ -12,5 +12,6 @@ namespace Iot.Device.Arduino SPI_READ = 0x04, SPI_REPLY = 0x05, SPI_END = 0x06, + SPI_WRITE_ACK = 0x07, } } diff --git a/src/devices/Arduino/ReconnectingNetworkStream.cs b/src/devices/Arduino/ReconnectingNetworkStream.cs new file mode 100644 index 0000000000000000000000000000000000000000..8fd766b2e168c17d501e7d27470fc8c373ee7d68 --- /dev/null +++ b/src/devices/Arduino/ReconnectingNetworkStream.cs @@ -0,0 +1,147 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.IO; + +namespace Iot.Device.Arduino +{ + internal class ReconnectingNetworkStream : Stream + { + private readonly Func _reconnect; + private readonly object _lock = new object(); + private Stream? _streamImplementation; + + public ReconnectingNetworkStream(Stream? underlyingStream, Func reconnect) + { + _streamImplementation = underlyingStream; + _reconnect = reconnect; + Connect(); + } + + public ReconnectingNetworkStream(Func connect) + : this(null, connect) + { + } + + public override bool CanRead => SafeExecute(x => x.CanRead, false); + + public override bool CanSeek => SafeExecute(x => x.CanSeek, false); + + public override bool CanWrite => SafeExecute(x => x.CanWrite, false); + + public override long Length => SafeExecute(x => x.Length, false); + + public override long Position + { + get + { + return SafeExecute(x => x.Position, false); + } + set + { + SafeExecute(x => x.Position = value, false); + } + } + + public override void Flush() => SafeExecute(x => x.Flush(), true); + + public override int Read(byte[] buffer, int offset, int count) => SafeExecute(x => x.Read(buffer, offset, count), true); + + public override long Seek(long offset, SeekOrigin origin) => SafeExecute(x => x.Seek(offset, origin), true); + + public override void SetLength(long value) => SafeExecute(x => x.SetLength(value), false); + + public override void Write(byte[] buffer, int offset, int count) => SafeExecute(x => x.Write(buffer, offset, count), false); + + private void Connect() + { + if (_streamImplementation == null) + { + lock (_lock) + { + try + { + _streamImplementation = _reconnect(); + } + catch (IOException) + { + // Ignore, cannot currently reconnect. So need to retry later. + } + } + } + } + + private T SafeExecute(Func operation, bool doThrow) + where T : struct + { + try + { + Connect(); + + if (_streamImplementation == null) + { + if (doThrow) + { + throw new TimeoutException("Stream is disconnected. Retrying next time."); + } + + return default(T); + } + + return operation(_streamImplementation); + } + catch (IOException x) + { + _streamImplementation?.Dispose(); + _streamImplementation = null; + if (doThrow) + { + throw new TimeoutException("Error executing operation. Retrying next time", x); + } + + return default(T); + } + } + + private void SafeExecute(Action operation, bool doThrow) + { + try + { + Connect(); + + if (_streamImplementation == null) + { + if (doThrow) + { + throw new TimeoutException("Stream is disconnected. Retrying next time."); + } + + return; + } + + operation(_streamImplementation); + } + catch (IOException x) + { + _streamImplementation?.Dispose(); + _streamImplementation = null; + if (doThrow) + { + throw new TimeoutException("Error executing operation. Retrying next time", x); + } + } + } + + public override void Close() + { + if (_streamImplementation != null) + { + _streamImplementation.Close(); + _streamImplementation = null; + } + + base.Close(); + } + } +} diff --git a/src/devices/Arduino/tests/BlockingConcurrentBagTest.cs b/src/devices/Arduino/tests/BlockingConcurrentBagTest.cs new file mode 100644 index 0000000000000000000000000000000000000000..b07756b571fb5ba60fb137992ab1a4872abb78a2 --- /dev/null +++ b/src/devices/Arduino/tests/BlockingConcurrentBagTest.cs @@ -0,0 +1,36 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Text; +using Iot.Device.Arduino; +using Xunit; + +namespace Iot.Device.Arduino.Tests +{ + public class BlockingConcurrentBagTest + { + private BlockingConcurrentBag _blockingConcurrentBag; + + public BlockingConcurrentBagTest() + { + _blockingConcurrentBag = new BlockingConcurrentBag(); + } + + [Fact] + public void CanAddAndRemove() + { + _blockingConcurrentBag.Add(1); + Assert.True(_blockingConcurrentBag.TryRemoveElement(x => true, TimeSpan.Zero, out int a)); + Assert.True(a == 1); + } + + [Fact] + public void ReturnsFalseWhenElementDoesNotMatch() + { + _blockingConcurrentBag.Add(1); + Assert.False(_blockingConcurrentBag.TryRemoveElement(x => x == 3, TimeSpan.FromSeconds(1), out int a)); + } + } +}