未验证 提交 6be64b74 编写于 作者: P Patrick Grawehr 提交者: GitHub

Improve performance for large SPI buffer transfers (#1841)

- Support waiting for SPI write commands. When using fast data rates, a buffer overrun
will otherwise occur
- Write SPI data in 7-bit encoding format (requires firmata update)
- Make flow control parameter controllable from outside
- Various stability fixes
上级 de27d5de
......@@ -63,12 +63,27 @@ namespace Iot.Device.Arduino
/// The device is initialized when the first command is sent. The constructor always succeeds.
/// </remarks>
/// <param name="serialPortStream">A stream to an Arduino/Firmata device</param>
public ArduinoBoard(Stream serialPortStream)
/// <param name="usesHardwareFlowControl">True to indicate that the stream supports hardware flow control (can be a serial port
/// with RTS/CTS handshake or a network stream where the protocol already supports flow control)</param>
public ArduinoBoard(Stream serialPortStream, bool usesHardwareFlowControl)
{
_dataStream = serialPortStream ?? throw new ArgumentNullException(nameof(serialPortStream));
StreamUsesHardwareFlowControl = usesHardwareFlowControl;
_logger = this.GetCurrentClassLogger();
}
/// <summary>
/// Creates an instance of an Ardino board connection using the given stream (typically from a serial port)
/// </summary>
/// <remarks>
/// The device is initialized when the first command is sent. The constructor always succeeds.
/// </remarks>
/// <param name="serialPortStream">A stream to an Arduino/Firmata device</param>
public ArduinoBoard(Stream serialPortStream)
: this(serialPortStream, false)
{
}
/// <summary>
/// Creates an instance of the Arduino board connection connected to a serial port
/// </summary>
......@@ -80,6 +95,7 @@ namespace Iot.Device.Arduino
{
_dataStream = null;
_serialPort = new SerialPort(portName, baudRate);
StreamUsesHardwareFlowControl = false; // Would need to configure the serial port externally for this to work
_logger = this.GetCurrentClassLogger();
}
......@@ -88,6 +104,16 @@ namespace Iot.Device.Arduino
/// </summary>
protected ILogger Logger => _logger;
/// <summary>
/// Set this to true if the underlying stream uses some kind of hardware or low-level flow control (RTS/CTS for
/// a serial port, or a TCP socket). Setting this to true may improve performance on bulk transfers (such as
/// large SPI blocks) but can result in buffer overflows if flow control is not working. Default: false
/// </summary>
public bool StreamUsesHardwareFlowControl
{
get;
}
/// <summary>
/// The list of supported pin modes.
/// This list can be extended by adding special modes using <see cref="AddCommandHandler{T}"/>.
......@@ -162,14 +188,47 @@ namespace Iot.Device.Arduino
#endif
out ArduinoBoard? board)
{
board = null;
return TryConnectToNetworkedBoard(boardAddress, port, true, out board);
}
/// <summary>
/// Tries to connect to an arduino over network.
/// This requires an arduino with an ethernet shield or an ESP32 with enabled WIFI support.
/// </summary>
/// <param name="boardAddress">The IP address of the board</param>
/// <param name="port">The network port to use. The default port is 27016</param>
/// <param name="useAutoReconnect">True to use an auto-reconnecting stream. Helpful when using an unreliable connection.</param>
/// <param name="board">Returns the board if successful</param>
/// <returns>True on success, false otherwise</returns>
public static bool TryConnectToNetworkedBoard(IPAddress boardAddress, int port, bool useAutoReconnect,
#if NET5_0_OR_GREATER
[NotNullWhen(true)]
#endif
out ArduinoBoard? board)
{
try
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(boardAddress, port);
socket.NoDelay = true;
var networkStream = new NetworkStream(socket, true);
board = new ArduinoBoard(networkStream);
Stream networkStream;
if (useAutoReconnect)
{
networkStream = new ReconnectingNetworkStream(() =>
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(boardAddress, port);
socket.NoDelay = true;
Stream socketStream = new NetworkStream(socket, true);
return socketStream;
});
}
else
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(boardAddress, port);
socket.NoDelay = true;
networkStream = new NetworkStream(socket, true);
}
board = new ArduinoBoard(networkStream, true);
if (!(board.FirmataVersion > new Version(1, 0)))
{
// Actually not expecting to get here (but the above will throw a SocketException if the remote end is not there)
......@@ -180,6 +239,7 @@ namespace Iot.Device.Arduino
}
catch (SocketException)
{
board = null;
return false;
}
}
......@@ -508,7 +568,21 @@ namespace Iot.Device.Arduino
}
else
{
Logger.LogInformation(message);
// If the message contains a line feed, strip that
Logger.LogInformation(message.TrimEnd(new char[] { '\r', '\n' }));
}
_commandHandlersLock.EnterReadLock();
try
{
foreach (var handler in _extendedCommandHandlers)
{
handler.OnErrorMessage(message, exception);
}
}
finally
{
_commandHandlersLock.ExitReadLock();
}
}
......@@ -561,7 +635,12 @@ namespace Iot.Device.Arduino
{
Initialize();
return new GpioController(PinNumberingScheme.Logical, new ArduinoGpioControllerDriver(this, _supportedPinConfigurations));
if (_firmata == null)
{
throw new ObjectDisposedException(nameof(_firmata));
}
return new GpioController(PinNumberingScheme.Logical, new ArduinoGpioControllerDriver(_firmata, _supportedPinConfigurations));
}
/// <inheritdoc />
......@@ -691,6 +770,16 @@ namespace Iot.Device.Arduino
Firmata.SetAnalogInputSamplingInterval(timeSpan);
}
/// <summary>
/// Performs a software reset of the Arduino firmware
/// </summary>
public void SoftwareReset()
{
Initialize();
Firmata.SendSoftwareReset();
Firmata.QueryCapabilities();
}
/// <summary>
/// Standard dispose pattern
/// </summary>
......@@ -708,6 +797,12 @@ namespace Iot.Device.Arduino
}
}
if (_firmata != null)
{
// Can end the next possible moment (otherwise might just throw a bunch of warnings before actually terminating anyway)
_firmata.InputThreadShouldExit = true;
}
_isDisposed = true;
// Do this first, to force any blocking read operations to end
if (_dataStream != null)
......@@ -754,5 +849,43 @@ namespace Iot.Device.Arduino
Firmata.DisableSpi();
}
}
/// <summary>
/// Pings the device, to get an estimate about the round-trip time.
/// With some Wifi setups, the round trip time may be significantly higher than desired.
/// </summary>
/// <param name="number">The number of pings to send</param>
/// <returns>The list of reply times. Contains a negative value for lost packets</returns>
public List<TimeSpan> Ping(int number)
{
Initialize();
if (_firmata == null)
{
throw new ObjectDisposedException("Not connected");
}
List<TimeSpan> ret = new();
Stopwatch sw = new Stopwatch();
for (int i = 0; i < number; i++)
{
sw.Restart();
try
{
_firmata.QueryFirmwareVersion(out _);
var elapsed = sw.Elapsed;
ret.Add(elapsed);
_logger.LogInformation($"Round trip time: {elapsed.TotalMilliseconds}ms");
}
catch (TimeoutException x)
{
_logger.LogError(x, $"Timeout: {x.Message}");
ret.Add(TimeSpan.FromMinutes(-1));
}
sw.Stop();
}
return ret;
}
}
}
......@@ -14,26 +14,28 @@ namespace Iot.Device.Arduino
{
internal class ArduinoGpioControllerDriver : GpioDriver
{
private readonly ArduinoBoard _arduinoBoard;
private readonly FirmataDevice _device;
private readonly IReadOnlyCollection<SupportedPinConfiguration> _supportedPinConfigurations;
private readonly Dictionary<int, CallbackContainer> _callbackContainers;
private readonly ConcurrentDictionary<int, PinMode> _pinModes;
private readonly object _callbackContainersLock;
private readonly AutoResetEvent _waitForEventResetEvent;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<int, PinValue?> _outputPinValues;
internal ArduinoGpioControllerDriver(ArduinoBoard arduinoBoard, IReadOnlyCollection<SupportedPinConfiguration> supportedPinConfigurations)
internal ArduinoGpioControllerDriver(FirmataDevice device, IReadOnlyCollection<SupportedPinConfiguration> supportedPinConfigurations)
{
_arduinoBoard = arduinoBoard ?? throw new ArgumentNullException(nameof(arduinoBoard));
_device = device ?? throw new ArgumentNullException(nameof(device));
_supportedPinConfigurations = supportedPinConfigurations ?? throw new ArgumentNullException(nameof(supportedPinConfigurations));
_callbackContainers = new Dictionary<int, CallbackContainer>();
_waitForEventResetEvent = new AutoResetEvent(false);
_callbackContainersLock = new object();
_pinModes = new ConcurrentDictionary<int, PinMode>();
_outputPinValues = new ConcurrentDictionary<int, PinValue?>();
_logger = this.GetCurrentClassLogger();
PinCount = _supportedPinConfigurations.Count;
_arduinoBoard.Firmata.DigitalPortValueUpdated += FirmataOnDigitalPortValueUpdated;
_device.DigitalPortValueUpdated += FirmataOnDigitalPortValueUpdated;
}
protected override int PinCount { get; }
......@@ -48,6 +50,10 @@ namespace Iot.Device.Arduino
protected override void OpenPin(int pinNumber)
{
if (pinNumber < 0 || pinNumber >= PinCount)
{
throw new ArgumentOutOfRangeException(nameof(pinNumber), $"Pin {pinNumber} is not valid");
}
}
protected override void ClosePin(int pinNumber)
......@@ -65,9 +71,11 @@ namespace Iot.Device.Arduino
break;
case PinMode.InputPullUp:
firmataMode = SupportedMode.InputPullup;
_outputPinValues.TryRemove(pinNumber, out _);
break;
case PinMode.Input:
firmataMode = SupportedMode.DigitalInput;
_outputPinValues.TryRemove(pinNumber, out _);
break;
default:
_logger.LogError($"Mode {mode} is not supported as argument to {nameof(SetPinMode)}");
......@@ -81,7 +89,7 @@ namespace Iot.Device.Arduino
throw new NotSupportedException($"Mode {mode} is not supported on Pin {pinNumber}.");
}
_arduinoBoard.Firmata.SetPinMode(pinNumber, firmataMode);
_device.SetPinMode(pinNumber, firmataMode);
// Cache this value. Since the GpioController calls GetPinMode when trying to write a pin (to verify whether it is set to output),
// that would be very expensive here. And setting output pins should be cheap.
......@@ -95,7 +103,7 @@ namespace Iot.Device.Arduino
return existingValue;
}
var mode = _arduinoBoard.Firmata.GetPinMode(pinNumber);
var mode = _device.GetPinMode(pinNumber);
PinMode ret;
if (mode == SupportedMode.DigitalOutput.Value)
......@@ -144,12 +152,22 @@ namespace Iot.Device.Arduino
protected override PinValue Read(int pinNumber)
{
return _arduinoBoard.Firmata.ReadDigitalPin(pinNumber);
return _device.ReadDigitalPin(pinNumber);
}
protected override void Write(int pinNumber, PinValue value)
{
_arduinoBoard.Firmata.WriteDigitalPin(pinNumber, value);
if (_outputPinValues.TryGetValue(pinNumber, out PinValue? existingValue) && existingValue != null)
{
// If this output value is already present, don't send a message.
if (value == existingValue.Value)
{
return;
}
}
_device.WriteDigitalPin(pinNumber, value);
_outputPinValues.AddOrUpdate(pinNumber, x => value, (y, z) => value);
}
protected override WaitForEventResult WaitForEvent(int pinNumber, PinEventTypes eventTypes, CancellationToken cancellationToken)
......@@ -173,7 +191,7 @@ namespace Iot.Device.Arduino
}
}
_arduinoBoard.Firmata.DigitalPortValueUpdated += WaitForEventPortValueUpdated;
_device.DigitalPortValueUpdated += WaitForEventPortValueUpdated;
try
{
WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, _waitForEventResetEvent });
......@@ -188,7 +206,7 @@ namespace Iot.Device.Arduino
}
finally
{
_arduinoBoard.Firmata.DigitalPortValueUpdated -= WaitForEventPortValueUpdated;
_device.DigitalPortValueUpdated -= WaitForEventPortValueUpdated;
}
return new WaitForEventResult()
......@@ -265,7 +283,8 @@ namespace Iot.Device.Arduino
_callbackContainers.Clear();
}
_arduinoBoard.Firmata.DigitalPortValueUpdated -= FirmataOnDigitalPortValueUpdated;
_outputPinValues.Clear();
_device.DigitalPortValueUpdated -= FirmataOnDigitalPortValueUpdated;
}
base.Dispose(disposing);
......
......@@ -15,6 +15,7 @@ namespace Iot.Device.Arduino
private readonly ArduinoBoard _board;
private readonly int _busId;
private readonly HashSet<int> _usedAddresses;
private bool _busInitialized;
public ArduinoI2cBus(ArduinoBoard board, int busId)
{
......@@ -31,6 +32,18 @@ namespace Iot.Device.Arduino
throw new InvalidOperationException($"Device number {deviceAddress} is already in use");
}
if (!_busInitialized)
{
// Ensure the corresponding pins are set to I2C (not strictly necessary, but consistent)
foreach (SupportedPinConfiguration supportedPinConfiguration in _board.SupportedPinConfigurations.Where(x => x.PinModes.Contains(SupportedMode.I2c)))
{
_board.Firmata.SetPinMode(supportedPinConfiguration.Pin, SupportedMode.I2c);
}
_board.Firmata.SendI2cConfigCommand();
_busInitialized = true;
}
var device = new ArduinoI2cDevice(_board, this, new I2cConnectionSettings(_busId, deviceAddress));
_usedAddresses.Add(deviceAddress);
return device;
......
......@@ -50,7 +50,7 @@ namespace Iot.Device.Arduino
public override void Write(ReadOnlySpan<byte> buffer)
{
Board.Firmata.SpiWrite(ConnectionSettings.ChipSelectLine, buffer);
Board.Firmata.SpiWrite(ConnectionSettings.ChipSelectLine, buffer, !Board.StreamUsesHardwareFlowControl);
}
public override void TransferFullDuplex(ReadOnlySpan<byte> writeBuffer, Span<byte> readBuffer)
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Iot.Device.Arduino
{
/// <summary>
/// Represents a collection that removes objects based on a certain pattern
/// </summary>
internal class BlockingConcurrentBag<T>
{
private readonly object _lock = new object();
private readonly List<T?> _container = new List<T?>();
public int Count
{
get
{
lock (_lock)
{
return _container.Count;
}
}
}
public void Add(T? elem)
{
lock (_lock)
{
_container.Add(elem);
Monitor.PulseAll(_lock);
}
}
public void Clear()
{
lock (_lock)
{
_container.Clear();
}
}
/// <summary>
/// Waits until an element is in the queue that matches the given predicate.
/// Checking the predicate should be fast.
/// </summary>
/// <param name="predicate">The predicate to test</param>
/// <param name="timeout">The overall timeout</param>
/// <param name="element">Returns the element found, if any</param>
/// <returns>True if an element was found within the timeout, false otherwise</returns>
public bool TryRemoveElement(Func<T?, bool> predicate, TimeSpan timeout, out T? element)
{
bool lockTaken = false;
Stopwatch sw = Stopwatch.StartNew();
element = default;
try
{
Monitor.TryEnter(_lock, timeout, ref lockTaken);
if (lockTaken)
{
// The critical section.
while (true)
{
// Cannot use FirstOrDefault here, because we need to be able to distinguish between
// finding nothing and finding an empty (null, default) element
for (var index = 0; index < _container.Count; index++)
{
T? elem = _container[index];
if (predicate(elem))
{
_container.RemoveAt(index);
element = elem;
return true;
}
}
if (sw.Elapsed > timeout)
{
return false;
}
TimeSpan remaining = timeout - sw.Elapsed;
if (remaining <= TimeSpan.Zero || !Monitor.Wait(_lock, remaining))
{
return false;
}
}
}
else
{
return false;
}
}
finally
{
// Ensure that the lock is released.
if (lockTaken)
{
Monitor.Exit(_lock);
}
}
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Iot.Device.Arduino
{
/// <summary>
/// This class is used to encode larger chunks of data for transmission using the Firmata protocol.
/// It converts each block of 7 bytes into a block of 8 bytes, keeping the top bit 0.
/// </summary>
public static class Encoder7Bit
{
/// <summary>
/// Calculates the number of bytes generated during decode (the result is smaller than the input)
/// </summary>
public static int Num8BitOutBytes(int inputBytes)
{
// Equals * 7 / 8
return (int)Math.Floor(((inputBytes) * 7) / 8.0);
}
/// <summary>
/// Calculates the number of bytes required for the 7-byte encoding
/// </summary>
public static int Num7BitOutBytes(int inputBytes)
{
return (int)Math.Ceiling(((inputBytes) * 8.0) / 7);
}
/// <summary>
/// Encode a sequence of bytes
/// </summary>
/// <param name="data">The data to encode</param>
/// <returns>The encoded data</returns>
public static byte[] Encode(ReadOnlySpan<byte> data)
{
return Encode(data, 0, data.Length);
}
/// <summary>
/// Encodes a sequence of bytes
/// </summary>
/// <param name="data">The data to encode</param>
/// <param name="startIndex">The start index in the data</param>
/// <param name="length">The length of the data</param>
/// <returns>The encoded data</returns>
public static byte[] Encode(ReadOnlySpan<byte> data, int startIndex, int length)
{
int shift = 0;
byte[] retBytes = new byte[Num7BitOutBytes(length)];
int index = 0;
int previous = 0;
for (int i = startIndex; i < startIndex + length; i++)
{
if (shift == 0)
{
retBytes[index] = (byte)(data[i] & 0x7f);
index++;
shift++;
previous = data[i] >> 7;
}
else
{
retBytes[index] = (byte)(((data[i] << shift) & 0x7f) | previous);
index++;
if (shift == 6)
{
retBytes[index] = (byte)(data[i] >> 1);
index++;
shift = 0;
}
else
{
shift++;
previous = data[i] >> (8 - shift);
}
}
}
if (shift > 0)
{
// Write remainder
retBytes[index] = (byte)previous;
}
return retBytes;
}
/// <summary>
/// Decodes the given data sequence
/// </summary>
/// <param name="inData">The data to decode</param>
/// <returns>The decoded data</returns>
public static byte[] Decode(ReadOnlySpan<byte> inData)
{
byte[] outBytes = new byte[Num8BitOutBytes(inData.Length)];
for (int i = 0; i < outBytes.Length; i++)
{
int j = i << 3;
int pos = j / 7;
int shift = j % 7;
outBytes[i] = (byte)((inData[pos] >> shift) | ((inData[pos + 1] << (7 - shift)) & 0xFF));
}
return outBytes;
}
}
}
......@@ -62,13 +62,7 @@ namespace Iot.Device.Arduino
/// This might need to be checked in Dispose, to make sure an uninitialized component doesn't attempt
/// to send a command.
/// </summary>
protected bool IsRegistered
{
get
{
return _board != null;
}
}
protected bool IsRegistered => _board != null;
/// <summary>
/// The reference to the arduino board
......@@ -135,25 +129,39 @@ namespace Iot.Device.Arduino
throw new InvalidOperationException("Command handler not registered");
}
return _firmata.SendCommandAndWait(commandSequence, timeout, out error);
return _firmata.SendCommandAndWait(commandSequence, timeout, IsMatchingAck, out error);
}
/// <summary>
/// Send a command to the device, expecting a reply.
/// </summary>
/// <param name="commandSequence">Command to send. This
/// <param name="commandSequences">Commands to send. This
/// should normally be a sysex command.</param>
/// <param name="timeout">Command timeout</param>
/// <param name="error">An error code in case of a failure</param>
/// <exception cref="TimeoutException">The timeout elapsed before a reply was received.</exception>
/// <returns>The reply packet</returns>
protected byte[] SendCommandAndWait(FirmataCommandSequence commandSequence, TimeSpan timeout)
/// <returns>True if all packets where send and properly acknowledged</returns>
protected bool SendCommandsAndWait(IList<FirmataCommandSequence> commandSequences, TimeSpan timeout, out CommandError error)
{
if (_firmata == null)
{
throw new InvalidOperationException("Command handler not registered");
}
return _firmata.SendCommandAndWait(commandSequence, timeout);
return _firmata.SendCommandsAndWait(commandSequences, timeout, IsMatchingAck, HasCommandError, out error);
}
/// <summary>
/// Send a command to the device, expecting a reply.
/// </summary>
/// <param name="commandSequence">Command to send. This
/// should normally be a sysex command.</param>
/// <param name="timeout">Command timeout</param>
/// <exception cref="TimeoutException">The timeout elapsed before a reply was received.</exception>
/// <returns>The reply packet</returns>
protected byte[] SendCommandAndWait(FirmataCommandSequence commandSequence, TimeSpan timeout)
{
return SendCommandAndWait(commandSequence, timeout, out _);
}
/// <summary>
......@@ -171,16 +179,35 @@ namespace Iot.Device.Arduino
/// <summary>
/// This is called when a sysex command is received from the board.
/// This can include the reply to a command sent by a <see cref="SendCommandAndWait(FirmataCommandSequence)"/> before, in which case
/// the reply should be ignored, as it is returned as result of the call itself.
/// the reply should be ignored, as it is returned as result of the call itself. Therefore it is advised to use this function only
/// to listen for data sent by the device automatically (e.g event messages or recurring status reports)
/// </summary>
/// <param name="type">Type of data received from the hardware. This should normally be <see cref="ReplyType.SysexCommand"/>,
/// unless the hardware sends unencoded Ascii messages</param>
/// <param name="data">The binary representation of the received data</param>
/// <remarks>The implementation needs to check whether the data is for itself. The messages are not filtered by requester!</remarks>
/// <remarks>The implementation needs to check the type and source of the data. The messages are not filtered by requester!</remarks>
protected virtual void OnSysexData(ReplyType type, byte[] data)
{
}
/// <summary>
/// This method is called to check whether the reply is a valid ACK/NOACK for the given command sequence.
/// Can be used to avoid accepting something as command reply that is completely unrelated (such as an asynchronous callback).
/// In different words, this should return false if the given reply is not something that is an answer to a synchronous command.
/// </summary>
/// <param name="sequence">The sequence that was sent</param>
/// <param name="reply">The reply</param>
/// <returns>True if this reply matches the sequence. True is the default, for backwards compatibility</returns>
protected virtual bool IsMatchingAck(FirmataCommandSequence sequence, byte[] reply) => true;
/// <summary>
/// Callback function that returns whether the given reply indicates an error
/// </summary>
/// <param name="sequence">The original sequence</param>
/// <param name="reply">The reply. <see cref="IsMatchingAck"/> is already tested to be true for this reply</param>
/// <returns>A command error code, in case this reply indicates a no-acknowledge</returns>
protected virtual CommandError HasCommandError(FirmataCommandSequence sequence, byte[] reply) => CommandError.None;
private void OnSysexDataInternal(ReplyType type, byte[] data)
{
if (_firmata == null)
......@@ -205,6 +232,16 @@ namespace Iot.Device.Arduino
_board = null;
}
/// <summary>
/// Called by the infrastructure when the parser reports an error or information message.
/// The default implementation does nothing.
/// </summary>
/// <param name="message">The message text</param>
/// <param name="exception">The exception observed (may be null)</param>
protected internal virtual void OnErrorMessage(string message, Exception? exception)
{
}
/// <inheritdoc />
public void Dispose()
{
......
......@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace Iot.Device.Arduino
......@@ -12,8 +13,10 @@ namespace Iot.Device.Arduino
/// A firmata command sequence
/// Intended to be changed to public visibility later
/// </summary>
public class FirmataCommandSequence
public class FirmataCommandSequence : IEquatable<FirmataCommandSequence>
{
private const int InitialCommandLength = 32;
/// <summary>
/// Start of sysex command byte. Used as start byte for almost all extended commands.
/// </summary>
......@@ -56,6 +59,8 @@ namespace Iot.Device.Arduino
/// </summary>
public int Length => _sequence.Count;
internal byte[] InternalSequence => _sequence.ToArray();
/// <summary>
/// Decode an uint from packed 7-bit data.
/// This way of encoding uints is only used in extension modules.
......@@ -95,6 +100,17 @@ namespace Iot.Device.Arduino
return (Int32)DecodeUInt32(data, fromOffset);
}
/// <summary>
/// Decodes a 14-bit integer into a short
/// </summary>
/// <param name="data">Data array</param>
/// <param name="idx">Start offset</param>
/// <returns></returns>
public static short DecodeInt14(byte[] data, int idx)
{
return (short)(data[idx] | data[idx + 1] << 7);
}
/// <summary>
/// Send an Uint32 as 5 x 7 bits. This form of transmitting integers is only supported by extension modules
/// </summary>
......@@ -178,5 +194,70 @@ namespace Iot.Device.Arduino
_sequence.Add((byte)(values[i] >> 7 & sbyte.MaxValue));
}
}
/// <inheritdoc/>
public override string ToString()
{
StringBuilder b = new StringBuilder();
int maxBytes = Math.Min(Length, 32);
for (int i = 0; i < maxBytes; i++)
{
b.Append($"{_sequence[i]:X2} ");
}
if (maxBytes < Length)
{
b.Append("...");
}
return b.ToString();
}
/// <inheritdoc />
public bool Equals(FirmataCommandSequence? other)
{
if (ReferenceEquals(null, other))
{
return false;
}
if (ReferenceEquals(this, other))
{
return true;
}
return _sequence.Equals(other._sequence) && Length == other.Length;
}
/// <inheritdoc />
public override bool Equals(object? obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}
if (ReferenceEquals(this, obj))
{
return true;
}
if (obj.GetType() != GetType())
{
return false;
}
return Equals((FirmataCommandSequence)obj);
}
/// <inheritdoc />
public override int GetHashCode()
{
unchecked
{
return (_sequence.GetHashCode() * 397);
}
}
}
}
......@@ -12,5 +12,6 @@ namespace Iot.Device.Arduino
SPI_READ = 0x04,
SPI_REPLY = 0x05,
SPI_END = 0x06,
SPI_WRITE_ACK = 0x07,
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.IO;
namespace Iot.Device.Arduino
{
internal class ReconnectingNetworkStream : Stream
{
private readonly Func<Stream> _reconnect;
private readonly object _lock = new object();
private Stream? _streamImplementation;
public ReconnectingNetworkStream(Stream? underlyingStream, Func<Stream> reconnect)
{
_streamImplementation = underlyingStream;
_reconnect = reconnect;
Connect();
}
public ReconnectingNetworkStream(Func<Stream> connect)
: this(null, connect)
{
}
public override bool CanRead => SafeExecute(x => x.CanRead, false);
public override bool CanSeek => SafeExecute(x => x.CanSeek, false);
public override bool CanWrite => SafeExecute(x => x.CanWrite, false);
public override long Length => SafeExecute(x => x.Length, false);
public override long Position
{
get
{
return SafeExecute(x => x.Position, false);
}
set
{
SafeExecute(x => x.Position = value, false);
}
}
public override void Flush() => SafeExecute(x => x.Flush(), true);
public override int Read(byte[] buffer, int offset, int count) => SafeExecute(x => x.Read(buffer, offset, count), true);
public override long Seek(long offset, SeekOrigin origin) => SafeExecute(x => x.Seek(offset, origin), true);
public override void SetLength(long value) => SafeExecute(x => x.SetLength(value), false);
public override void Write(byte[] buffer, int offset, int count) => SafeExecute(x => x.Write(buffer, offset, count), false);
private void Connect()
{
if (_streamImplementation == null)
{
lock (_lock)
{
try
{
_streamImplementation = _reconnect();
}
catch (IOException)
{
// Ignore, cannot currently reconnect. So need to retry later.
}
}
}
}
private T SafeExecute<T>(Func<Stream, T> operation, bool doThrow)
where T : struct
{
try
{
Connect();
if (_streamImplementation == null)
{
if (doThrow)
{
throw new TimeoutException("Stream is disconnected. Retrying next time.");
}
return default(T);
}
return operation(_streamImplementation);
}
catch (IOException x)
{
_streamImplementation?.Dispose();
_streamImplementation = null;
if (doThrow)
{
throw new TimeoutException("Error executing operation. Retrying next time", x);
}
return default(T);
}
}
private void SafeExecute(Action<Stream> operation, bool doThrow)
{
try
{
Connect();
if (_streamImplementation == null)
{
if (doThrow)
{
throw new TimeoutException("Stream is disconnected. Retrying next time.");
}
return;
}
operation(_streamImplementation);
}
catch (IOException x)
{
_streamImplementation?.Dispose();
_streamImplementation = null;
if (doThrow)
{
throw new TimeoutException("Error executing operation. Retrying next time", x);
}
}
}
public override void Close()
{
if (_streamImplementation != null)
{
_streamImplementation.Close();
_streamImplementation = null;
}
base.Close();
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.Text;
using Iot.Device.Arduino;
using Xunit;
namespace Iot.Device.Arduino.Tests
{
public class BlockingConcurrentBagTest
{
private BlockingConcurrentBag<int> _blockingConcurrentBag;
public BlockingConcurrentBagTest()
{
_blockingConcurrentBag = new BlockingConcurrentBag<int>();
}
[Fact]
public void CanAddAndRemove()
{
_blockingConcurrentBag.Add(1);
Assert.True(_blockingConcurrentBag.TryRemoveElement(x => true, TimeSpan.Zero, out int a));
Assert.True(a == 1);
}
[Fact]
public void ReturnsFalseWhenElementDoesNotMatch()
{
_blockingConcurrentBag.Add(1);
Assert.False(_blockingConcurrentBag.TryRemoveElement(x => x == 3, TimeSpan.FromSeconds(1), out int a));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册