提交 17b2f9fa 编写于 作者: 若汝棋茗

版本号: 0.6.0

更新日期:2022.9.10
更新描述:兼容性更新,增强型更新。专为Unity 3D适配。
更新详情:
优化 1. Gzip的压缩效率。
2. 发送效率。
新增 1. IDataCompressor数据传输压缩接口。
2. RemoteStream支持数据读写压缩。
3. WaitResultPackageBase类,专属非序列化的数据格式化。
4. DelaySender延迟压缩发送。
修改 1. 无
修复 1. Rpc注册服务为单例时,实际上是瞬时服务的bug。
删除 1. 独立线程发送。
上级 861607ce
......@@ -5,7 +5,7 @@
<ApplicationIcon>logo.ico</ApplicationIcon>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>RRQM.pfx</AssemblyOriginatorKeyFile>
<Version>0.5.0</Version>
<Version>0.6.0</Version>
<LangVersion>8.0</LangVersion>
<Company>若汝棋茗</Company>
<Copyright>Copyright © 2022 若汝棋茗</Copyright>
......@@ -63,6 +63,6 @@ API:https://www.yuque.com/rrqm/touchsocket/index</Description>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.26" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" />
<PackageReference Include="TouchSocket" Version="0.5.0" />
<PackageReference Include="TouchSocket" Version="0.6.0" />
</ItemGroup>
</Project>
......@@ -201,6 +201,7 @@ namespace TouchSocket.Core.ByteManager
/// <summary>
/// 获取内存核心。获取的核心可以不用归还。
/// 如果要调用<see cref="Recycle(byte[])"/>归还,切记不要有持久性引用。
/// </summary>
/// <param name="byteSize"></param>
/// <param name="equalSize"></param>
......
......@@ -17,43 +17,89 @@ using System.Threading;
namespace TouchSocket.Core.Collections.Concurrent
{
/// <summary>
/// 智能数据安全队列
/// 队列数据
/// </summary>
/// <typeparam name="T"></typeparam>
public class IntelligentDataQueue<T> : ConcurrentQueue<T> where T : IQueueData
public interface IQueueData
{
private bool m_overflowWait;
/// <summary>
/// 数据长度
/// </summary>
int Size { get; }
}
/// <summary>
/// 传输字节
/// </summary>
public class QueueDataBytes : IQueueData
{
/// <summary>
/// 溢出等待
/// 构造函数
/// </summary>
public bool OverflowWait
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public QueueDataBytes(byte[] buffer, int offset, int length)
{
get => this.m_overflowWait;
set => this.m_overflowWait = value;
this.Offset = offset;
this.Length = length;
this.Buffer = buffer;
this.Size = length;
}
private Action<bool> m_onQueueChanged;
/// <summary>
/// 从指定内存创建一个新对象,且内存也为新创建。
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <returns></returns>
public static QueueDataBytes CreateNew(byte[] buffer, int offset, int length)
{
byte[] buf = new byte[length];
Array.Copy(buffer, offset, buf, 0, length);
return new QueueDataBytes(buf);
}
/// <summary>
/// 在队列修改时
/// 构造函数
/// </summary>
public Action<bool> OnQueueChanged
/// <param name="buffer"></param>
public QueueDataBytes(byte[] buffer) : this(buffer, 0, buffer.Length)
{
get => this.m_onQueueChanged;
set => this.m_onQueueChanged = value;
}
private bool m_free;
/// <summary>
/// 数据内存
/// </summary>
public byte[] Buffer { get; }
/// <summary>
/// 是否有空位允许入队
/// 长度
/// </summary>
public bool Free => this.m_free;
public int Length { get; }
private long m_actualSize;
/// <summary>
/// 偏移
/// </summary>
public int Offset { get; }
/// <summary>
/// 尺寸
/// </summary>
public int Size { get; }
}
/// <summary>
/// 智能数据安全队列
/// </summary>
/// <typeparam name="T"></typeparam>
public class IntelligentDataQueue<T> : ConcurrentQueue<T> where T : IQueueData
{
private long m_actualSize;
private bool m_free;
private long m_maxSize;
private Action<bool> m_onQueueChanged;
private bool m_overflowWait;
/// <summary>
/// 构造函数
......@@ -73,6 +119,16 @@ namespace TouchSocket.Core.Collections.Concurrent
{
}
/// <summary>
/// 实际尺寸
/// </summary>
public long ActualSize => this.m_actualSize;
/// <summary>
/// 是否有空位允许入队
/// </summary>
public bool Free => this.m_free;
/// <summary>
/// 允许的最大长度
/// </summary>
......@@ -90,9 +146,27 @@ namespace TouchSocket.Core.Collections.Concurrent
}
/// <summary>
/// 实际尺寸
/// 在队列修改时
/// </summary>
public long ActualSize => this.m_actualSize;
public Action<bool> OnQueueChanged
{
get => this.m_onQueueChanged;
set => this.m_onQueueChanged = value;
}
/// <summary>
/// 溢出等待
/// </summary>
public bool OverflowWait
{
get => this.m_overflowWait;
set => this.m_overflowWait = value;
}
/// <summary>
/// 超时时间。默认1000*30ms;
/// </summary>
public int Timeout { get; set; } = 1000 * 30;
/// <summary>
/// 清空队列
......@@ -122,7 +196,7 @@ namespace TouchSocket.Core.Collections.Concurrent
if (this.m_overflowWait)
{
SpinWait.SpinUntil(this.Check);
SpinWait.SpinUntil(this.Check, this.Timeout);
}
Interlocked.Add(ref this.m_actualSize, item.Size);
......@@ -156,63 +230,4 @@ namespace TouchSocket.Core.Collections.Concurrent
return this.m_actualSize < this.m_maxSize;
}
}
/// <summary>
/// 队列数据
/// </summary>
public interface IQueueData
{
/// <summary>
/// 数据长度
/// </summary>
int Size { get; }
}
/// <summary>
/// 传输字节
/// </summary>
public readonly struct QueueDataBytes : IQueueData
{
/// <summary>
/// 构造函数
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public QueueDataBytes(byte[] buffer, int offset, int length)
{
this.Offset = offset;
this.Length = length;
this.Buffer = buffer;
this.Size = length;
}
/// <summary>
/// 构造函数
/// </summary>
/// <param name="buffer"></param>
public QueueDataBytes(byte[] buffer) : this(buffer, 0, buffer.Length)
{
}
/// <summary>
/// 数据内存
/// </summary>
public byte[] Buffer { get; }
/// <summary>
/// 偏移
/// </summary>
public int Offset { get; }
/// <summary>
/// 长度
/// </summary>
public int Length { get; }
/// <summary>
/// 尺寸
/// </summary>
public int Size { get; }
}
}
\ No newline at end of file
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Core
{
......
......@@ -50,7 +50,7 @@
// public static Exception Throw(Enum @enum,string msg,Exception exception)
// {
// }
// }
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Core
{
......
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
namespace TouchSocket.Core.Data
{
/// <summary>
/// GZip压缩算法的压缩机
/// </summary>
public sealed class GZipDataCompressor : IDataCompressor
{
byte[] IDataCompressor.Compress(ArraySegment<byte> data)
{
return GZip.Compress(data.Array, data.Offset, data.Count);
}
byte[] IDataCompressor.Decompress(ArraySegment<byte> data)
{
return GZip.Decompress(data.Array, data.Offset, data.Count);
}
}
}
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
namespace TouchSocket.Core.Data
{
/// <summary>
/// 数据压缩机接口
/// </summary>
public interface IDataCompressor
{
/// <summary>
/// 压缩数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
byte[] Compress(ArraySegment<byte> data);
/// <summary>
/// 解压数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
byte[] Decompress(ArraySegment<byte> data);
}
}
......@@ -12,6 +12,7 @@
//------------------------------------------------------------------------------
using System.IO;
using System.IO.Compression;
using TouchSocket.Core.ByteManager;
namespace TouchSocket.Core.Data
{
......@@ -23,48 +24,120 @@ namespace TouchSocket.Core.Data
/// <summary>
/// 压缩数据
/// </summary>
/// <param name="data"></param>
/// <param name="byteBlock"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <returns></returns>
public static byte[] Compress(byte[] data)
public static void Compress(ByteBlock byteBlock, byte[] buffer, int offset, int length)
{
using (MemoryStream stream = new MemoryStream())
using (GZipStream gZipStream = new GZipStream(byteBlock, CompressionMode.Compress, true))
{
using (GZipStream gZipStream = new GZipStream(stream, CompressionMode.Compress))
{
gZipStream.Write(data, 0, data.Length);
gZipStream.Close();
}
return stream.ToArray();
gZipStream.Write(buffer, offset, length);
gZipStream.Close();
}
}
/// <summary>
/// 压缩数据
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="buffer"></param>
/// <returns></returns>
public static void Compress(ByteBlock byteBlock, byte[] buffer)
{
Compress(byteBlock, buffer, 0, buffer.Length);
}
/// <summary>
/// 压缩数据
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <returns></returns>
public static byte[] Compress(byte[] buffer, int offset, int length)
{
using (ByteBlock byteBlock = new ByteBlock(length))
{
Compress(byteBlock, buffer, offset, length);
return byteBlock.ToArray();
}
}
/// <summary>
/// 压缩数据
/// </summary>
/// <param name="buffer"></param>
/// <returns></returns>
public static byte[] Compress(byte[] buffer)
{
return Compress(buffer, 0, buffer.Length);
}
/// <summary>
/// 解压数据
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="data"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <returns></returns>
public static byte[] Decompress(byte[] data)
public static void Decompress(ByteBlock byteBlock, byte[] data, int offset, int length)
{
try
using (GZipStream gZipStream = new GZipStream(new MemoryStream(data, offset, length), CompressionMode.Decompress))
{
using MemoryStream stream = new MemoryStream();
using (GZipStream gZipStream = new GZipStream(new MemoryStream(data), CompressionMode.Decompress))
byte[] bytes = BytePool.GetByteCore(1024 * 64);
try
{
byte[] bytes = new byte[40960];
int n;
while ((n = gZipStream.Read(bytes, 0, bytes.Length)) != 0)
int r;
while ((r = gZipStream.Read(bytes, 0, bytes.Length)) != 0)
{
stream.Write(bytes, 0, n);
byteBlock.Write(bytes, 0, r);
}
gZipStream.Close();
}
return stream.ToArray();
finally
{
BytePool.Recycle(bytes);
}
}
catch
}
/// <summary>
/// 解压数据
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="data"></param>
public static void Decompress(ByteBlock byteBlock, byte[] data)
{
Decompress(byteBlock, data, 0, data.Length);
}
/// <summary>
/// 解压数据
/// </summary>
/// <param name="data"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <returns></returns>
public static byte[] Decompress(byte[] data, int offset, int length)
{
using (ByteBlock byteBlock = new ByteBlock(length))
{
return null;
Decompress(byteBlock, data, offset, length);
return byteBlock.ToArray();
}
}
/// <summary>
/// 解压数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public static byte[] Decompress(byte[] data)
{
return Decompress(data, 0, data.Length);
}
}
}
\ No newline at end of file
......@@ -97,6 +97,26 @@ namespace TouchSocket.Core.Extensions
}
}
/// <summary>
/// 获取值。如果键不存在,则返回默认值。
/// </summary>
/// <typeparam name="Tkey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="dictionary"></param>
/// <param name="tkey"></param>
/// <returns></returns>
public static TValue GetValue<Tkey, TValue>(this Dictionary<Tkey, TValue> dictionary, Tkey tkey)
{
if (dictionary.TryGetValue(tkey, out TValue value))
{
return value;
}
else
{
return default;
}
}
#endregion 字典扩展
}
}
\ No newline at end of file
......@@ -19,7 +19,7 @@ namespace TouchSocket.Core.IO
/// <summary>
/// 阻塞式单项读取流。
/// </summary>
public abstract class BlockReadStream : Stream,IWrite
public abstract class BlockReadStream : Stream, IWrite
{
private readonly AutoResetEvent m_inputEvent;
private readonly AutoResetEvent m_readEvent;
......
......@@ -140,7 +140,7 @@ namespace TouchSocket.Core.IO
/// </summary>
protected bool InputComplate()
{
return this.Input(new byte[0], 0, 0);
return this.Input(new byte[0], 0, 0);
}
private void Reset()
......
......@@ -10,12 +10,8 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Core.IO
{
......
......@@ -106,8 +106,8 @@ namespace TouchSocket.Core.IO
return GetWriter(path, singleRef);
}
}
}
}
/// <summary>
/// 获取一个可读可写的Stream对象。
/// </summary>
......
......@@ -88,7 +88,7 @@ namespace TouchSocket.Core.IO
/// 文件流。
/// 一般情况下,请不要直接访问该对象。否则有可能会产生不可预测的错误。
/// </summary>
public FileStream FileStream { get => this.m_fileStream;}
public FileStream FileStream { get => this.m_fileStream; }
/// <summary>
/// 创建一个只读的、已经缓存的文件信息。该操作不会占用文件句柄。
......
......@@ -10,19 +10,14 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Core.IO
{
/// <summary>
/// FileStorageStream
/// </summary>
public class FileStorageStream:Stream
public class FileStorageStream : Stream
{
private readonly FileStorage m_fileStorage;
private long m_position;
......@@ -74,7 +69,7 @@ namespace TouchSocket.Core.IO
/// <summary>
/// <inheritdoc/>
/// </summary>
public override long Position { get=>this.m_position ; set=>this.m_position=value; }
public override long Position { get => this.m_position; set => this.m_position = value; }
/// <summary>
/// <inheritdoc/>
......@@ -93,7 +88,7 @@ namespace TouchSocket.Core.IO
/// <returns></returns>
public override int Read(byte[] buffer, int offset, int count)
{
return this.m_fileStorage.Read(this.m_position,buffer,offset,count);
return this.m_fileStorage.Read(this.m_position, buffer, offset, count);
}
/// <summary>
......@@ -138,7 +133,7 @@ namespace TouchSocket.Core.IO
/// <param name="count"></param>
public override void Write(byte[] buffer, int offset, int count)
{
this.m_fileStorage.Write(this.m_position,buffer,offset,count);
this.m_fileStorage.Write(this.m_position, buffer, offset, count);
}
/// <summary>
......
......@@ -16,7 +16,7 @@ namespace TouchSocket.Core.IO
/// <summary>
/// 文件写入器。
/// </summary>
public class FileStorageWriter : DisposableObject,IWrite
public class FileStorageWriter : DisposableObject, IWrite
{
private readonly FileStorage m_fileStorage;
private long m_position;
......@@ -78,7 +78,7 @@ namespace TouchSocket.Core.IO
/// <returns></returns>
public long SeekToEnd()
{
return this.Position = this.FileStorage.Length;
return this.Position = this.FileStorage.Length;
}
/// <summary>
......
......@@ -187,7 +187,7 @@ namespace TouchSocket.Core.IO
/// <returns></returns>
public static string[] GetFiles(string sourceFolder)
{
return Directory.GetFiles(sourceFolder).Select(s=>Path.GetFileName(s)).ToArray();
return Directory.GetFiles(sourceFolder).Select(s => Path.GetFileName(s)).ToArray();
}
# if NET45_OR_GREATER
......
......@@ -31,13 +31,13 @@ namespace TouchSocket.Core.Log
private readonly ConsoleColor m_consoleForegroundColor;
private readonly ConsoleColor m_consoleBackgroundColor;
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="logType"></param>
/// <param name="source"></param>
/// <param name="message"></param>
/// <param name="exception"></param>
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="logType"></param>
/// <param name="source"></param>
/// <param name="message"></param>
/// <param name="exception"></param>
protected override void WriteLog(LogType logType, object source, string message, Exception exception)
{
lock (typeof(ConsoleLogger))
......
......@@ -41,14 +41,14 @@ namespace TouchSocket.Core.Log
this.m_action1 = action;
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="logType"></param>
/// <param name="source"></param>
/// <param name="message"></param>
/// <param name="exception"></param>
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="logType"></param>
/// <param name="source"></param>
/// <param name="message"></param>
/// <param name="exception"></param>
protected override void WriteLog(LogType logType, object source, string message, Exception exception)
{
try
......
......@@ -11,7 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Text;
using TouchSocket.Core.IO;
......@@ -85,7 +84,7 @@ namespace TouchSocket.Core.Log
{
Directory.CreateDirectory(dir);
}
if (m_writer==null)
if (m_writer == null)
{
int count = 0;
string path = null;
......@@ -94,14 +93,14 @@ namespace TouchSocket.Core.Log
path = Path.Combine(dir, $"{count:0000}" + ".log");
if (!File.Exists(path))
{
m_writer= FilePool.GetWriter(path);
m_writer = FilePool.GetWriter(path);
break;
}
count++;
}
}
m_writer.Write(Encoding.UTF8.GetBytes(logString));
if (m_writer.FileStorage.Length>1024*1024)
if (m_writer.FileStorage.Length > 1024 * 1024)
{
m_writer.SafeDispose();
m_writer = null;
......
......@@ -11,11 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Core.Log;
namespace TouchSocket.Core.Log
{
......@@ -53,7 +48,7 @@ namespace TouchSocket.Core.Log
{
if (this.LogType.HasFlag(logType))
{
WriteLog(logType, source, message, exception);
this.WriteLog(logType, source, message, exception);
}
}
......
......@@ -115,7 +115,7 @@ namespace TouchSocket.Core.Log
{
if (logger is LoggerGroup loggerGroup)
{
loggerGroup.Log<TLog>(logType,source,message,exception);
loggerGroup.Log<TLog>(logType, source, message, exception);
}
}
/// <summary>
......@@ -144,7 +144,7 @@ namespace TouchSocket.Core.Log
/// <param name="logger"></param>
/// <param name="source"></param>
/// <param name="msg"></param>
public static void Warning<TLog>(this ILog logger, object source, string msg)where TLog:ILog
public static void Warning<TLog>(this ILog logger, object source, string msg) where TLog : ILog
{
logger.Log<TLog>(LogType.Warning, source, msg, null);
}
......
......@@ -11,8 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections;
using System.Collections.Generic;
namespace TouchSocket.Core.Log
{
......@@ -96,7 +94,7 @@ namespace TouchSocket.Core.Log
/// <typeparam name="TLog1"></typeparam>
/// <typeparam name="TLog2"></typeparam>
/// <typeparam name="TLog3"></typeparam>
public class LoggerGroup<TLog1, TLog2,TLog3> : LoggerGroup
public class LoggerGroup<TLog1, TLog2, TLog3> : LoggerGroup
where TLog1 : ILog
where TLog2 : ILog
where TLog3 : ILog
......@@ -104,7 +102,7 @@ namespace TouchSocket.Core.Log
/// <summary>
/// 一组日志记录器
/// </summary>
public LoggerGroup(TLog1 log1, TLog2 log2,TLog3 log3) : base(log1, log2,log3)
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3) : base(log1, log2, log3)
{
}
}
......@@ -125,7 +123,7 @@ namespace TouchSocket.Core.Log
/// <summary>
/// 一组日志记录器
/// </summary>
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4) : base(log1, log2, log3,log4)
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4) : base(log1, log2, log3, log4)
{
}
}
......@@ -148,7 +146,7 @@ namespace TouchSocket.Core.Log
/// <summary>
/// 一组日志记录器
/// </summary>
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4, TLog5 log5) : base(log1, log2, log3, log4,log5)
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4, TLog5 log5) : base(log1, log2, log3, log4, log5)
{
}
}
......@@ -173,7 +171,7 @@ namespace TouchSocket.Core.Log
/// <summary>
/// 一组日志记录器
/// </summary>
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4, TLog5 log5, TLog6 log6) : base(log1, log2, log3, log4, log5,log6)
public LoggerGroup(TLog1 log1, TLog2 log2, TLog3 log3, TLog4 log4, TLog5 log5, TLog6 log6) : base(log1, log2, log3, log4, log5, log6)
{
}
}
......
......@@ -11,10 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Core.Plugins
{
......
......@@ -12,13 +12,12 @@
//------------------------------------------------------------------------------
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Core.Dependency;
using TouchSocket.Core.Log;
using TouchSocket.Core.Reflection;
namespace TouchSocket.Core.Plugins
......@@ -76,7 +75,7 @@ namespace TouchSocket.Core.Plugins
var types = plugin.GetType().GetInterfaces().Where(a => typeof(IPlugin).IsAssignableFrom(a)).ToArray();
foreach (var type in types)
{
if (!m_pluginInfoes.ContainsKey(type))
if (!this.m_pluginInfoes.ContainsKey(type))
{
Dictionary<string, PluginMethod[]> pairs = new Dictionary<string, PluginMethod[]>();
var ms = type.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.DeclaredOnly);
......@@ -89,10 +88,10 @@ namespace TouchSocket.Core.Plugins
throw new Exception("插件的接口方法不允许重载");
}
List<PluginMethod> methods = new List<PluginMethod>();
if (item.GetCustomAttribute<AsyncRaiserAttribute>()!=null)
if (item.GetCustomAttribute<AsyncRaiserAttribute>() != null)
{
var asyncMethod= type.GetMethod($"{item.Name}Async");
if (asyncMethod==null)
var asyncMethod = type.GetMethod($"{item.Name}Async", BindingFlags.Instance | BindingFlags.Public | BindingFlags.DeclaredOnly);
if (asyncMethod == null)
{
throw new Exception("当接口标识为异步时,还应当定义其异步方法,以“Async”结尾");
}
......@@ -101,7 +100,7 @@ namespace TouchSocket.Core.Plugins
{
throw new Exception("异步接口方法不符合设定");
}
if (asyncMethod.ReturnType!=typeof(Task))
if (asyncMethod.ReturnType != typeof(Task))
{
throw new Exception("异步接口方法返回值必须为Task。");
}
......@@ -111,7 +110,7 @@ namespace TouchSocket.Core.Plugins
pairs.Add(item.Name, methods.ToArray());
}
}
m_pluginInfoes.Add(type, pairs);
this.m_pluginInfoes.Add(type, pairs);
}
}
......@@ -158,7 +157,7 @@ namespace TouchSocket.Core.Plugins
{
return false;
}
if (m_pluginInfoes.TryGetValue(typeof(TPlugin), out var value))
if (this.m_pluginInfoes.TryGetValue(typeof(TPlugin), out var value))
{
if (value.TryGetValue(name, out PluginMethod[] pluginMethods))
{
......@@ -176,8 +175,9 @@ namespace TouchSocket.Core.Plugins
{
pluginMethods[j].Invoke(this.m_plugins[i].Plugin, sender, e);
}
catch
catch (Exception ex)
{
this.Container.Resolve<ILog>().Exception(ex);
}
}
}
......
......@@ -11,8 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
......@@ -44,11 +42,6 @@ namespace TouchSocket.Core.Reflection
/// </summary>
public class Method
{
/// <summary>
/// 方法执行委托
/// </summary>
private readonly Func<object, object[], object> m_invoker;
private readonly MethodInfo m_info;
private readonly bool m_isByRef;
......@@ -67,32 +60,32 @@ namespace TouchSocket.Core.Reflection
if (item.ParameterType.IsByRef)
{
this.m_isByRef = true;
if (method.ReturnType == typeof(Task))
{
this.HasReturn = false;
this.TaskType = TaskType.Task;
}
else if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
{
this.HasReturn = true;
this.ReturnType = method.ReturnType.GetGenericArguments()[0];
this.TaskType = TaskType.TaskObject;
}
else if (method.ReturnType == typeof(void))
{
this.HasReturn = false;
this.TaskType = TaskType.None;
}
else
{
this.HasReturn = true;
this.TaskType = TaskType.None;
this.ReturnType = method.ReturnType;
}
return;
break;
}
}
this.m_invoker = this.CreateInvoker(method);
if (method.ReturnType == typeof(Task))
{
this.HasReturn = false;
this.TaskType = TaskType.Task;
}
else if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
{
this.HasReturn = true;
this.ReturnType = method.ReturnType.GetGenericArguments()[0];
this.TaskType = TaskType.TaskObject;
}
else if (method.ReturnType == typeof(void))
{
this.HasReturn = false;
this.TaskType = TaskType.None;
}
else
{
this.HasReturn = true;
this.TaskType = TaskType.None;
this.ReturnType = method.ReturnType;
}
}
/// <summary>
......@@ -143,27 +136,13 @@ namespace TouchSocket.Core.Reflection
case TaskType.None:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
return re;
}
case TaskType.Task:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
Task task = (Task)re;
task.Wait();
return default;
......@@ -171,14 +150,7 @@ namespace TouchSocket.Core.Reflection
case TaskType.TaskObject:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
Task task = (Task)re;
task.Wait();
return task.GetType().GetProperty("Result").GetValue(task);
......@@ -201,27 +173,13 @@ namespace TouchSocket.Core.Reflection
case TaskType.None:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
return re;
}
case TaskType.Task:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
Task task = (Task)re;
await task;
return default;
......@@ -229,14 +187,7 @@ namespace TouchSocket.Core.Reflection
case TaskType.TaskObject:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
Task task = (Task)re;
await task;
return task.GetType().GetProperty("Result").GetValue(task);
......@@ -263,88 +214,18 @@ namespace TouchSocket.Core.Reflection
case TaskType.Task:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
return (Task)re;
}
case TaskType.TaskObject:
{
object re;
if (this.m_isByRef)
{
re = this.m_info.Invoke(instance, parameters);
}
else
{
re = this.m_invoker.Invoke(instance, parameters);
}
re = this.m_info.Invoke(instance, parameters);
return (Task)re;
}
default:
return default;
}
}
/// <summary>
/// 生成方法的调用委托
/// </summary>
/// <param name="method">方法成员信息</param>
/// <exception cref="ArgumentException"></exception>
/// <returns></returns>
private Func<object, object[], object> CreateInvoker(MethodInfo method)
{
var instance = Expression.Parameter(typeof(object), "instance");
var parameters = Expression.Parameter(typeof(object[]), "parameters");
var instanceCast = method.IsStatic ? null : Expression.Convert(instance, method.DeclaringType);
var parametersCast = method.GetParameters().Select((p, i) =>
{
var parameter = Expression.ArrayIndex(parameters, Expression.Constant(i));
return Expression.Convert(parameter, p.ParameterType);
});
var body = Expression.Call(instanceCast, method, parametersCast);
if (method.ReturnType == typeof(Task))
{
this.HasReturn = false;
this.TaskType = TaskType.Task;
var bodyCast = Expression.Convert(body, typeof(object));
return Expression.Lambda<Func<object, object[], object>>(bodyCast, instance, parameters).Compile();
}
else if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
{
this.TaskType = TaskType.TaskObject;
this.HasReturn = true;
this.ReturnType = method.ReturnType.GetGenericArguments()[0];
var bodyCast = Expression.Convert(body, typeof(object));
return Expression.Lambda<Func<object, object[], object>>(bodyCast, instance, parameters).Compile();
}
else if (method.ReturnType == typeof(void))
{
this.HasReturn = false;
this.TaskType = TaskType.None;
var action = Expression.Lambda<Action<object, object[]>>(body, instance, parameters).Compile();
return (_instance, _parameters) =>
{
action.Invoke(_instance, _parameters);
return null;
};
}
else
{
this.HasReturn = true;
this.TaskType = TaskType.None;
this.ReturnType = method.ReturnType;
var bodyCast = Expression.Convert(body, typeof(object));
return Expression.Lambda<Func<object, object[], object>>(bodyCast, instance, parameters).Compile();
}
}
}
}
\ No newline at end of file
......@@ -24,6 +24,12 @@ namespace TouchSocket.Core.Run
{
private static readonly ConcurrentDictionary<object, Timer> timers = new ConcurrentDictionary<object, Timer>();
#if DEBUG
/// <summary>
/// Timers
/// </summary>
public static ConcurrentDictionary<object, Timer> Timers => timers;
#endif
/// <summary>
/// 延迟执行
/// </summary>
......
......@@ -32,7 +32,7 @@ namespace TouchSocket.Core.Run
private int executedCount;
private TimeSpan interval;
private readonly TimeSpan interval;
private readonly int loopCount;
......
......@@ -71,6 +71,17 @@ namespace TouchSocket.Core.Run
this.m_waitQueue.Clear();
}
/// <summary>
/// 取消全部
/// </summary>
public void CancelAll()
{
foreach (var item in this.m_waitDic.Values)
{
item.Cancel();
}
}
/// <summary>
/// 获取一个可等待对象
/// </summary>
......
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using TouchSocket.Core.ByteManager;
namespace TouchSocket.Core.Run
{
/// <summary>
/// WaitResultPackageBase
/// </summary>
public class WaitResultPackageBase : WaitResult
{
/// <summary>
/// 打包.
/// <para>重写的话,基类方法必须先执行</para>
/// </summary>
/// <param name="byteBlock"></param>
public virtual void Package(ByteBlock byteBlock)
{
byteBlock.Write(this.Sign);
byteBlock.Write(this.Status);
byteBlock.Write(this.Message);
}
/// <summary>
/// 解包
/// <para>重写的话,基类方法必须先执行</para>
/// </summary>
/// <param name="byteBlock"></param>
public virtual void Unpackage(ByteBlock byteBlock)
{
this.Sign = byteBlock.ReadInt64();
this.Status = (byte)byteBlock.ReadByte();
this.Message = byteBlock.ReadString();
}
}
}
......@@ -36,8 +36,8 @@ namespace TouchSocket.Http
/// <param name="passWord"></param>
public HttpProxy(IPHost host, string userName, string passWord)
{
Host = host;
Credential = new NetworkCredential(userName, passWord, $"{host.IP}:{host.Port}");
this.Host = host;
this.Credential = new NetworkCredential(userName, passWord, $"{host.IP}:{host.Port}");
}
/// <summary>
......
......@@ -29,7 +29,7 @@ namespace TouchSocket.Http
/// <param name="nonceCount">暂时不知道是什么</param>
public AuthenticationChallenge(string value, NetworkCredential credential, uint nonceCount = 0)
{
Parse(value, credential);
this.Parse(value, credential);
this.NonceCount = nonceCount;
}
......@@ -55,8 +55,8 @@ namespace TouchSocket.Http
/// <exception cref="Exception"></exception>
public override string ToString()
{
if (Type == AuthenticationType.Basic)
return ToBasicString();
if (this.Type == AuthenticationType.Basic)
return this.ToBasicString();
else
throw new Exception("该凭证类型不支持");
}
......@@ -68,7 +68,7 @@ namespace TouchSocket.Http
throw new Exception("该凭证类型不支持");
var schm = chal[0].ToLower();
this.Parameters = ParseParameters(chal[1]);
this.Parameters = this.ParseParameters(chal[1]);
if (this.Parameters.ContainsKey("username") == false)
this.Parameters.Add("username", credential.Username);
......@@ -91,7 +91,7 @@ namespace TouchSocket.Http
private Dictionary<string, string> ParseParameters(string value)
{
var res = new Dictionary<string, string>();
IEnumerable<string> values = SplitHeaderValue(value, ',');
IEnumerable<string> values = this.SplitHeaderValue(value, ',');
foreach (var param in values)
{
var i = param.IndexOf('=');
......@@ -153,7 +153,7 @@ namespace TouchSocket.Http
private string ToBasicString()
{
var userPass = $"{Parameters["username"]}:{Parameters["password"]}";
var userPass = $"{this.Parameters["username"]}:{this.Parameters["password"]}";
var cred = Convert.ToBase64String(Encoding.UTF8.GetBytes(userPass));
return "Basic " + cred;
}
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Http
{
......
......@@ -11,10 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Http
{
......@@ -39,10 +35,10 @@ namespace TouchSocket.Http
if (username.Length == 0)
throw new ArgumentException("An empty string.", "username");
Username = username;
Password = password;
Domain = domain;
Roles = roles;
this.Username = username;
this.Password = password;
this.Domain = domain;
this.Roles = roles;
}
/// <summary>
/// 凭证用户名
......
......@@ -14,8 +14,8 @@ using System;
using System.Threading;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Resources;
using TouchSocket.Core.Run;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Http
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Http.WebSockets;
namespace TouchSocket.Core.Plugins
......
......@@ -11,9 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Rpc
......@@ -30,7 +27,7 @@ namespace TouchSocket.Rpc
/// <param name="invokeResult"></param>
public virtual void Executed(ICallContext callContext, ref InvokeResult invokeResult)
{
}
/// <summary>
......@@ -52,7 +49,7 @@ namespace TouchSocket.Rpc
/// <param name="exception"></param>
public virtual void ExecutException(ICallContext callContext, ref InvokeResult invokeResult, Exception exception)
{
}
/// <summary>
......@@ -74,7 +71,7 @@ namespace TouchSocket.Rpc
/// <param name="invokeResult"></param>
public virtual void Executing(ICallContext callContext, ref InvokeResult invokeResult)
{
}
/// <summary>
......
......@@ -67,9 +67,9 @@ namespace TouchSocket.Rpc
if (!container.IsRegistered(typeof(IRpcServerFactory)))
{
this.Container.RegisterSingleton<IRpcServerFactory,RpcServerFactory>();
this.Container.RegisterSingleton<IRpcServerFactory, RpcServerFactory>();
}
SearchAttribute();
}
......@@ -264,7 +264,7 @@ namespace TouchSocket.Rpc
/// </summary>
public MethodInstance[] GetAllMethods()
{
List<MethodInstance> methods = new List<MethodInstance>();
List<MethodInstance> methods = new List<MethodInstance>();
foreach (var item in this.m_serverTypes.Values)
{
methods.AddRange(item);
......@@ -334,7 +334,7 @@ namespace TouchSocket.Rpc
foreach (var attrbute in attrbuteType)
{
foreach (var item in m_serverTypes.Keys)
foreach (var item in this.m_serverTypes.Keys)
{
ServerCellCode serverCellCode = CodeGenerator.Generator(item, attrbute);
codes.Add(serverCellCode);
......@@ -533,8 +533,8 @@ namespace TouchSocket.Rpc
{
if (newType.FullName == type.FullName)
{
this.m_serverTypes.TryRemove(newType,out var list);
methodInstances=list.ToArray();
this.m_serverTypes.TryRemove(newType, out var list);
methodInstances = list.ToArray();
return true;
}
}
......@@ -575,9 +575,9 @@ namespace TouchSocket.Rpc
item.IsSingleton = true;
item.ServerFactory = new RpcServerFactory(this.Container);
}
this.m_serverTypes.TryAdd(serverFromType,new List<MethodInstance>(methodInstances));
this.m_serverTypes.TryAdd(serverFromType, new List<MethodInstance>(methodInstances));
this.Container.RegisterSingleton(serverFromType, rpcServer);
foreach (var parser in this)
{
parser.OnRegisterServer(methodInstances);
......@@ -613,13 +613,13 @@ namespace TouchSocket.Rpc
bool singleton;
if (typeof(ITransientRpcServer).IsAssignableFrom(serverFromType))
{
singleton = true;
this.Container.RegisterSingleton(serverFromType, serverToType);
singleton = false;
this.Container.RegisterTransient(serverFromType, serverToType);
}
else
{
singleton = false;
this.Container.RegisterTransient(serverFromType, serverToType);
singleton = true;
this.Container.RegisterSingleton(serverFromType, serverToType);
}
MethodInstance[] methodInstances = CodeGenerator.GetMethodInstances(serverFromType);
......@@ -629,8 +629,8 @@ namespace TouchSocket.Rpc
item.ServerFactory = new RpcServerFactory(this.Container);
}
this.m_serverTypes.TryAdd(serverFromType,new List<MethodInstance>(methodInstances));
this.m_serverTypes.TryAdd(serverFromType, new List<MethodInstance>(methodInstances));
foreach (var parser in this)
{
parser.OnRegisterServer(methodInstances);
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Rpc
{
......
......@@ -133,7 +133,7 @@ namespace TouchSocket.Rpc
{
throw new ArgumentNullException(nameof(rpcParser.RpcStore), $"RpcStore为空,这一般是该解析器没有完成初始化配置所导致的。");
}
rpcParser.RpcStore.RegisterServer<T>();
rpcParser.RpcStore.RegisterServer<T>();
}
/// <summary>
......@@ -148,7 +148,7 @@ namespace TouchSocket.Rpc
{
throw new ArgumentNullException(nameof(rpcParser.RpcStore), $"RpcStore为空,这一般是该解析器没有完成初始化配置所导致的。");
}
rpcParser.RpcStore.RegisterServer(fromType);
rpcParser.RpcStore.RegisterServer(fromType);
}
/// <summary>
......
......@@ -13,8 +13,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Rpc
{
......
......@@ -32,12 +32,12 @@ namespace TouchSocket.Rpc
/// </list>
/// </para>
/// </summary>
object Caller { get;}
object Caller { get; }
/// <summary>
/// 本次调用的<see cref="MethodInstance"/>
/// </summary>
MethodInstance MethodInstance { get;}
MethodInstance MethodInstance { get; }
/// <summary>
/// 可取消的调用令箭
......
......@@ -11,9 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using TouchSocket.Rpc.TouchRpc;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc
{
/// <summary>
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Rpc
{
......@@ -29,6 +24,6 @@ namespace TouchSocket.Rpc
/// <param name="callContext"></param>
/// <param name="ps"></param>
/// <returns></returns>
public IRpcServer Create(ICallContext callContext,object[] ps);
public IRpcServer Create(ICallContext callContext, object[] ps);
}
}
......@@ -11,15 +11,12 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using TouchSocket.Rpc.TouchRpc;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc
{
/// <summary>
/// Rpc服务接口
/// </summary>
public interface ITransientRpcServer:IRpcServer
public interface ITransientRpcServer : IRpcServer
{
/// <summary>
/// 调用上下文
......
......@@ -10,20 +10,14 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Http;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.JsonRpc
{
/// <summary>
/// IJsonRpcCallContext
/// </summary>
public interface IJsonRpcCallContext:ICallContext
public interface IJsonRpcCallContext : ICallContext
{
/// <summary>
/// Json字符串
......
......@@ -306,8 +306,8 @@ namespace TouchSocket.Rpc.JsonRpc
if (rpcServer is ITransientRpcServer transientRpcServer)
{
transientRpcServer.CallContext = callContext;
}
}
invokeResult = this.m_rpcStore.Execute(rpcServer, callContext.JsonRpcPackage.parameters, callContext);
}
......
......@@ -26,11 +26,16 @@ namespace TouchSocket.Rpc.TouchRpc
{
private readonly IInternalRpc m_client;
private int m_cacheCapacity;
private volatile bool m_canFree;
private ByteBlock m_currentByteBlock;
private IntelligentDataQueue<ChannelData> m_dataQueue;
private int m_id;
private string m_lastOperationMes;
private AutoResetEvent m_moveWaitHandle;
private ChannelStatus m_status;
private string m_targetClientID;
......@@ -541,7 +546,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
data.byteBlock.Pos = 6;
this.m_lastOperationMes = data.byteBlock.ReadString();
data.byteBlock.Dispose();
data.byteBlock.SetHolding(false);
this.RequestComplete();
return;
......@@ -550,15 +555,14 @@ namespace TouchSocket.Rpc.TouchRpc
{
data.byteBlock.Pos = 6;
this.m_lastOperationMes = data.byteBlock.ReadString();
data.byteBlock.Dispose();
data.byteBlock.SetHolding(false);
this.RequestCancel();
return;
}
else if (data.type == TouchRpcUtility.P_104_DisposeOrder)
{
data.byteBlock.Pos = 6;
this.m_lastOperationMes = data.byteBlock.ReadString();
data.byteBlock.Dispose();
data.byteBlock.SetHolding(false);
this.RequestDispose();
return;
}
......@@ -566,7 +570,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
data.byteBlock.Pos = 6;
this.m_canFree = data.byteBlock.ReadBoolean();
data.byteBlock.Dispose();
data.byteBlock.SetHolding(false);
return;
}
}
......
......@@ -14,8 +14,8 @@ using System;
using System.Collections.Concurrent;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Resources;
using TouchSocket.Core.Run;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
......@@ -40,12 +40,15 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
public Channel CreateChannel()
{
while (true)
lock (this.SyncRoot)
{
int id = new Random().Next(int.MinValue, int.MaxValue);
if (!this.ChannelExisted(id))
while (true)
{
return this.CreateChannel(id);
int id = new object().GetHashCode();
if (!this.ChannelExisted(id))
{
return this.CreateChannel(id);
}
}
}
}
......
......@@ -77,7 +77,7 @@ namespace TouchSocket.Rpc.TouchRpc
WaitData<IWaitResult> waitData = this.WaitHandlePool.GetWaitData(waitFileInfo);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo, SerializationType.Json);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo);
try
{
this.SocketSend(TouchRpcUtility.P_500_PullFile_Request, byteBlock);
......@@ -182,7 +182,7 @@ namespace TouchSocket.Rpc.TouchRpc
WaitData<IWaitResult> waitData = this.WaitHandlePool.GetWaitData(waitFileInfo);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo, SerializationType.Json);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo);
try
{
this.SocketSend(TouchRpcUtility.P_503_PullFile2C_Request, byteBlock.Buffer, 0, byteBlock.Len);
......@@ -333,7 +333,7 @@ namespace TouchSocket.Rpc.TouchRpc
WaitData<IWaitResult> waitData = this.WaitHandlePool.GetWaitData(waitFileInfo);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo, SerializationType.Json);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo);
try
{
this.SocketSend(TouchRpcUtility.P_502_PushFile_Request, byteBlock);
......@@ -465,7 +465,7 @@ namespace TouchSocket.Rpc.TouchRpc
};
WaitData<IWaitResult> waitData = this.WaitHandlePool.GetWaitData(waitFileInfo);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo, SerializationType.Json);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitFileInfo);
try
{
this.SocketSend(TouchRpcUtility.P_507_PushFile2C_Request, byteBlock.Buffer, 0, byteBlock.Len);
......@@ -586,7 +586,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
waitTransfer.Status = 1;
this.SendJsonObject(responseOrder, waitTransfer);
this.SendFastObject(responseOrder, waitTransfer);
long position = waitTransfer.Position;
reader.Position = position;
fileOperator.SetFileCompletedLength(position);
......@@ -656,7 +656,7 @@ namespace TouchSocket.Rpc.TouchRpc
waitTransfer.Status = 3;
}
this.SendJsonObject(responseOrder, waitTransfer);
this.SendFastObject(responseOrder, waitTransfer);
this.OnFileTransfered?.Invoke(this, e);
}, null);
......@@ -699,7 +699,7 @@ namespace TouchSocket.Rpc.TouchRpc
};
WaitData<IWaitResult> waitData = this.WaitHandlePool.GetWaitData(waitTransfer);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitTransfer, SerializationType.Json);
ByteBlock byteBlock = new ByteBlock().WriteObject(waitTransfer);
try
{
if (clientID == null)
......@@ -964,7 +964,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
if (this.ResponseType == ResponseType.None || this.ResponseType == ResponseType.Pull)
{
this.SendJsonObject(responseOrder, new WaitTransfer() { Sign = waitRemoteFileInfo.Sign, Status = 6 });
this.SendFastObject(responseOrder, new WaitTransfer() { Sign = waitRemoteFileInfo.Sign, Status = 6 });
return;
}
FileRequest fileRequest = waitRemoteFileInfo.FileRequest;
......@@ -1011,7 +1011,7 @@ namespace TouchSocket.Rpc.TouchRpc
waitTransfer.ChannelID = channel.ID;
waitTransfer.Position = fileInfo.Position;
waitTransfer.Status = 1;
this.SendJsonObject(responseOrder, waitTransfer);
this.SendFastObject(responseOrder, waitTransfer);
fileOperator.SetFileCompletedLength(fileInfo.Position);
while (channel.MoveNext())
......@@ -1058,7 +1058,7 @@ namespace TouchSocket.Rpc.TouchRpc
}
this.OnFileTransfered?.Invoke(this, new FileTransferStatusEventArgs(args.TransferType, args.FileRequest, args.Metadata, fileOperator.Result, args.FileInfo));
this.SendJsonObject(TouchRpcUtility.P_509_PushFileAck_Request, new WaitResult()
this.SendFastObject(TouchRpcUtility.P_509_PushFileAck_Request, new WaitResult()
{
Sign = args.GetHashCode(),
Status = (byte)(fileOperator.Result.ResultCode == ResultCode.Success ? 1 : 0),
......@@ -1082,17 +1082,10 @@ namespace TouchSocket.Rpc.TouchRpc
this.OnFileTransfered?.Invoke(this, new FileTransferStatusEventArgs(args.TransferType, args.FileRequest, args.Metadata, fileOperator.Result, args.FileInfo));
}
this.SendJsonObject(responseOrder, waitTransfer);
this.SendFastObject(responseOrder, waitTransfer);
}, null);
}
private void SendJsonObject(short protocol, object obj)
{
using ByteBlock byteBlock = new ByteBlock();
byteBlock.WriteObject(obj, SerializationType.Json);
this.SocketSend(protocol, byteBlock);
}
private void SendFastObject(short protocol, object obj)
{
using ByteBlock byteBlock = new ByteBlock();
......
......@@ -18,8 +18,8 @@ using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Log;
using TouchSocket.Resources;
using TouchSocket.Core.Run;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
......@@ -1201,7 +1201,7 @@ namespace TouchSocket.Rpc.TouchRpc
if (invokeResult.Status == InvokeStatus.Ready)
{
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext,ps);
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext, ps);
if (rpcServer is ITransientRpcServer transientRpcServer)
{
transientRpcServer.CallContext = callContext;
......
......@@ -19,7 +19,6 @@ using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Log;
using TouchSocket.Core.Run;
using TouchSocket.Core.Serialization;
using TouchSocket.Resources;
namespace TouchSocket.Rpc.TouchRpc
......@@ -42,7 +41,7 @@ namespace TouchSocket.Rpc.TouchRpc
waitStream.Size = size;
waitStream.StreamType = stream.GetType().FullName;
ByteBlock byteBlock = BytePool.GetByteBlock(TouchRpcUtility.TransferPackage)
.WriteObject(waitStream, SerializationType.Json);
.WriteObject(waitStream);
try
{
this.SocketSend(TouchRpcUtility.P_400_SendStreamToSocketClient_Request, byteBlock);
......
......@@ -12,7 +12,6 @@
//------------------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using TouchSocket.Core;
......@@ -126,6 +125,11 @@ namespace TouchSocket.Rpc.TouchRpc
/// </summary>
public ILog Logger { get => this.m_logger; set => this.m_logger = value; }
/// <summary>
/// 获取可用于同步对<see cref="RpcActor"/>的访问的对象。
/// </summary>
public object SyncRoot { get; } = new object();
/// <summary>
/// <inheritdoc/>
/// </summary>
......@@ -173,6 +177,7 @@ namespace TouchSocket.Rpc.TouchRpc
rpcCallContext.TryCancel();
}
}
this.WaitHandlePool.CancelAll();
this.OnClose?.Invoke(this, message);
}
}
......@@ -262,6 +267,7 @@ namespace TouchSocket.Rpc.TouchRpc
switch (protocol)
{
#region 0-99
case TouchRpcUtility.P_0_Handshake_Request:
{
try
......@@ -383,8 +389,11 @@ namespace TouchSocket.Rpc.TouchRpc
{
break;
}
#endregion 0-99
#region 100-199
case TouchRpcUtility.P_100_CreateChannel_Request:
{
try
......@@ -542,8 +551,11 @@ namespace TouchSocket.Rpc.TouchRpc
break;
}
#endregion 100-199
#region 200-299
case TouchRpcUtility.P_200_Invoke_Request:/*函数调用*/
{
try
......@@ -617,8 +629,11 @@ namespace TouchSocket.Rpc.TouchRpc
}
break;
}
#endregion 200-299
#region 300-399
case TouchRpcUtility.P_300_GetAllEvents_Request:
{
break;
......@@ -651,14 +666,17 @@ namespace TouchSocket.Rpc.TouchRpc
{
break;
}
#endregion 300-399
#region 400-499
case TouchRpcUtility.P_400_SendStreamToSocketClient_Request://StreamStatusToThis
{
try
{
byteBlock.Pos = 2;
this.P_8_RequestStreamToThis(byteBlock.ReadObject<WaitStream>(SerializationType.Json));
this.P_8_RequestStreamToThis(byteBlock.ReadObject<WaitStream>());
}
catch (Exception ex)
{
......@@ -685,7 +703,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
this.P_9_RequestStreamToThis(byteBlock.ReadObject<WaitStream>(SerializationType.Json));
this.P_9_RequestStreamToThis(byteBlock.ReadObject<WaitStream>());
}
catch (Exception ex)
{
......@@ -694,17 +712,19 @@ namespace TouchSocket.Rpc.TouchRpc
break;
}
#endregion 400-499
#region 500-599
case TouchRpcUtility.P_500_PullFile_Request:
{
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
EasyAction.TaskRun(waitFileInfo, (w) =>
{
this.SendJsonObject(TouchRpcUtility.P_1500_PullFile_Response, this.RequestPullFile(w));
this.SendFastObject(TouchRpcUtility.P_1500_PullFile_Response, this.RequestPullFile(w));
});
}
catch (System.Exception ex)
......@@ -718,7 +738,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFile = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFile = byteBlock.ReadObject<WaitFileInfo>();
this.WaitHandlePool.SetRun(waitFile);
}
catch (Exception ex)
......@@ -733,7 +753,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.BeginPullFile(TouchRpcUtility.P_1501_BeginPullFile_Response, waitTransfer);
}
catch (System.Exception ex)
......@@ -747,7 +767,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.WaitHandlePool.SetRun(waitTransfer);
}
catch (Exception ex)
......@@ -761,7 +781,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
this.RequestPushFile(TouchRpcUtility.P_1502_PushFile_Response, waitFileInfo);
}
catch (System.Exception ex)
......@@ -775,7 +795,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.WaitHandlePool.SetRun(waitTransfer);
}
catch (Exception ex)
......@@ -790,19 +810,19 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
using (ByteBlock block = new ByteBlock())
{
if (this.OnFindRpcActor.Invoke(waitFileInfo.ClientID) is RpcActor rpcActor)
{
waitFileInfo.ClientID = this.ID;
rpcActor.SocketSend(TouchRpcUtility.P_504_PullFileFC_Request, block.WriteObject(waitFileInfo, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_504_PullFileFC_Request, block.WriteObject(waitFileInfo));
}
else
{
waitFileInfo.Status = 7;
this.SocketSend(TouchRpcUtility.P_1503_PullFile2C_Response, block.WriteObject(waitFileInfo, SerializationType.Json));
this.SocketSend(TouchRpcUtility.P_1503_PullFile2C_Response, block.WriteObject(waitFileInfo));
}
}
}
......@@ -815,7 +835,7 @@ namespace TouchSocket.Rpc.TouchRpc
case TouchRpcUtility.P_1503_PullFile2C_Response:
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
this.WaitHandlePool.SetRun(waitFileInfo);
break;
}
......@@ -824,11 +844,11 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
EasyAction.TaskRun(waitFileInfo, (w) =>
{
this.SendJsonObject(TouchRpcUtility.P_1504_PullFileFC_Response, this.RequestPullFile(w));
this.SendFastObject(TouchRpcUtility.P_1504_PullFileFC_Response, this.RequestPullFile(w));
});
}
catch (System.Exception ex)
......@@ -842,14 +862,14 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
using (ByteBlock block = new ByteBlock())
{
if (this.OnFindRpcActor?.Invoke(waitFileInfo.ClientID) is RpcActor rpcActor)
{
waitFileInfo.ClientID = this.ID;
rpcActor.SocketSend(TouchRpcUtility.P_1503_PullFile2C_Response, block.WriteObject(waitFileInfo, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_1503_PullFile2C_Response, block.WriteObject(waitFileInfo));
}
}
}
......@@ -864,13 +884,13 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
using (ByteBlock block = new ByteBlock())
{
if (this.OnFindRpcActor?.Invoke(waitTransfer.ClientID) is RpcActor rpcActor)
{
waitTransfer.ClientID = this.ID;
rpcActor.SocketSend(TouchRpcUtility.P_506_BeginPullFileFC_Request, block.WriteObject(waitTransfer, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_506_BeginPullFileFC_Request, block.WriteObject(waitTransfer));
}
}
}
......@@ -885,7 +905,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.WaitHandlePool.SetRun(waitTransfer);
}
catch (System.Exception ex)
......@@ -899,7 +919,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.BeginPullFile(TouchRpcUtility.P_1506_BeginPullFileFC_Response, waitTransfer);
}
catch (System.Exception ex)
......@@ -913,12 +933,12 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
using (ByteBlock block = new ByteBlock())
{
if (this.OnFindRpcActor?.Invoke(waitTransfer.ClientID) is RpcActor rpcActor)
{
rpcActor.SocketSend(TouchRpcUtility.P_1505_BeginPullFile2C_Response, block.WriteObject(waitTransfer, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_1505_BeginPullFile2C_Response, block.WriteObject(waitTransfer));
}
}
}
......@@ -933,18 +953,18 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
using (ByteBlock block = new ByteBlock())
{
if (this.OnFindRpcActor?.Invoke(waitFileInfo.ClientID) is RpcActor rpcActor)
{
waitFileInfo.ClientID = this.ID;
rpcActor.SocketSend(TouchRpcUtility.P_508_PushFileFC_Request, block.WriteObject(waitFileInfo, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_508_PushFileFC_Request, block.WriteObject(waitFileInfo));
}
else
{
this.SocketSend(TouchRpcUtility.P_1507_PushFile2C_Response, block.WriteObject(new WaitTransfer() { Sign = waitFileInfo.Sign, Status = 7 }, SerializationType.Json));
this.SocketSend(TouchRpcUtility.P_1507_PushFile2C_Response, block.WriteObject(new WaitTransfer() { Sign = waitFileInfo.Sign, Status = 7 }));
}
}
}
......@@ -959,7 +979,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
this.WaitHandlePool.SetRun(waitTransfer);
}
catch (System.Exception ex)
......@@ -973,7 +993,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>(SerializationType.Json);
WaitFileInfo waitFileInfo = byteBlock.ReadObject<WaitFileInfo>();
EasyAction.TaskRun(waitFileInfo, (w) =>
{
this.RequestPushFile(TouchRpcUtility.P_1508_PushFileFC_Response, w);
......@@ -990,14 +1010,14 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>(SerializationType.Json);
WaitTransfer waitTransfer = byteBlock.ReadObject<WaitTransfer>();
if (this.OnFindRpcActor?.Invoke(waitTransfer.ClientID) is RpcActor rpcActor)
{
using (ByteBlock block = new ByteBlock())
{
waitTransfer.ClientID = this.ID;
rpcActor.SocketSend(TouchRpcUtility.P_1507_PushFile2C_Response, block.WriteObject(waitTransfer, SerializationType.Json));
rpcActor.SocketSend(TouchRpcUtility.P_1507_PushFile2C_Response, block.WriteObject(waitTransfer));
}
}
}
......@@ -1012,7 +1032,7 @@ namespace TouchSocket.Rpc.TouchRpc
try
{
byteBlock.Pos = 2;
WaitResult waitResult = byteBlock.ReadObject<WaitResult>(SerializationType.Json);
WaitResult waitResult = byteBlock.ReadObject<WaitResult>();
this.m_eventArgs.TryAdd((int)waitResult.Sign, waitResult);
EasyAction.DelayRun(10000, waitResult, (a) =>
......@@ -1034,10 +1054,12 @@ namespace TouchSocket.Rpc.TouchRpc
{
break;
}
#endregion 500-599
default:
{
if (protocol<0)
if (protocol < 0)
{
return;
}
......
......@@ -11,11 +11,12 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
{
/// <summary>
/// RRQMRpcUtility
/// TouchRpcUtility
/// </summary>
public class TouchRpcUtility
{
......@@ -25,9 +26,9 @@ namespace TouchSocket.Rpc.TouchRpc
public const string TouchRpc = "TOUCHRPC";
/// <summary>
/// 集群验证令箭
/// TouchRpc
/// </summary>
public const string TouchRpcCluster = "C4C9D0F8-4004-4804-BBDA-129F8BC33659";
public static Protocol TouchRpcProtocol { get; private set; } = new Protocol(TouchRpc);
/// <summary>
/// 传输分包
......
......@@ -512,7 +512,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="requestInfo"></param>
protected override void HandleReceivedData(ByteBlock byteBlock, IRequestInfo requestInfo)
{
if (this.Protocol == TouchRpcExtensions.TouchRpc)
if (this.Protocol == TouchRpcUtility.TouchRpcProtocol && byteBlock != null)
{
this.m_rpcActor.InputReceivedData(byteBlock);
return;
......@@ -760,14 +760,13 @@ namespace TouchSocket.Rpc.TouchRpc
#endregion 内部委托绑定
#region 事件触发
/// <summary>
/// 当文件传输结束之后。并不意味着完成传输,请通过<see cref="FileTransferStatusEventArgs.Result"/>属性值进行判断。
/// </summary>
/// <param name="e"></param>
protected virtual void OnFileTransfered(FileTransferStatusEventArgs e)
{
}
/// <summary>
......@@ -776,7 +775,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfering(FileOperationEventArgs e)
{
}
/// <summary>
......
......@@ -15,9 +15,9 @@ using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Config;
using TouchSocket.Resources;
using TouchSocket.Core.Serialization;
using TouchSocket.Http;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
......@@ -130,7 +130,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfered(TClient client, FileTransferStatusEventArgs e)
{
}
/// <summary>
......@@ -140,7 +140,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfering(TClient client, FileOperationEventArgs e)
{
}
/// <summary>
......@@ -150,7 +150,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnHandshaked(TClient client, VerifyOptionEventArgs e)
{
}
/// <summary>
......@@ -160,7 +160,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e">参数</param>
protected virtual void OnHandshaking(TClient client, VerifyOptionEventArgs e)
{
}
/// <summary>
......@@ -171,7 +171,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="byteBlock"></param>
protected virtual void OnReceived(TClient client, short protocol, ByteBlock byteBlock)
{
}
/// <summary>
......@@ -181,7 +181,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfered(TClient client, StreamStatusEventArgs e)
{
}
/// <summary>
......@@ -191,7 +191,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfering(TClient client, StreamOperationEventArgs e)
{
}
private void PrivateOnRpcActorInit(HttpTouchRpcSocketClient client)
......
......@@ -27,35 +27,28 @@ namespace TouchSocket.Rpc.TouchRpc
/// </summary>
public class HttpTouchRpcSocketClient : HttpSocketClient, IHttpRpcClientBase
{
internal RpcActor m_rpcActor;
internal Action<HttpTouchRpcSocketClient> internalOnRpcActorInit;
private void InitRpcActor()
{
this.SwitchProtocolToTouchRpc();
this.internalOnRpcActorInit?.Invoke(this);
this.m_rpcActor.ID = this.ID;
}
internal RpcActor m_rpcActor;
/// <summary>
/// <inheritdoc/>
/// </summary>
public Func<IRpcClient, bool> TryCanInvoke { get; set; }
public bool IsHandshaked => this.m_rpcActor == null ? false : this.m_rpcActor.IsHandshaked;
/// <summary>
/// <inheritdoc/>
/// </summary>
public bool IsHandshaked => this.m_rpcActor == null ? false : this.m_rpcActor.IsHandshaked;
public ResponseType ResponseType { get => this.m_rpcActor.ResponseType; set => this.m_rpcActor.ResponseType = value; }
/// <summary>
/// <inheritdoc/>
/// </summary>
public ResponseType ResponseType { get => this.m_rpcActor.ResponseType; set => this.m_rpcActor.ResponseType = value; }
public string RootPath { get => this.m_rpcActor.RootPath; set => this.m_rpcActor.RootPath = value; }
/// <summary>
/// <inheritdoc/>
/// </summary>
public string RootPath { get => this.m_rpcActor.RootPath; set => this.m_rpcActor.RootPath = value; }
public RpcActor RpcActor => this.m_rpcActor;
/// <summary>
/// <inheritdoc/>
......@@ -63,14 +56,14 @@ namespace TouchSocket.Rpc.TouchRpc
public SerializationSelector SerializationSelector => this.m_rpcActor.SerializationSelector;
/// <summary>
/// 连接令箭
/// <inheritdoc/>
/// </summary>
public string VerifyToken => this.Config.GetValue<string>(TouchRpcConfigExtensions.VerifyTokenProperty);
public Func<IRpcClient, bool> TryCanInvoke { get; set; }
/// <summary>
/// <inheritdoc/>
/// 连接令箭
/// </summary>
public RpcActor RpcActor => this.m_rpcActor;
public string VerifyToken => this.Config.GetValue<string>(TouchRpcConfigExtensions.VerifyTokenProperty);
/// <summary>
/// <inheritdoc/>
......@@ -201,41 +194,41 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="targetID"></param>
/// <param name="method"></param>
/// <param name="invokeOption"></param>
/// <param name="parameters"></param>
/// <returns></returns>
public Task InvokeAsync(string method, IInvokeOption invokeOption, params object[] parameters)
/// <param name="types"></param>
public void Invoke(string targetID, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types)
{
return this.m_rpcActor.InvokeAsync(method, invokeOption, parameters);
this.m_rpcActor.Invoke(targetID, method, invokeOption, ref parameters, types);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="targetID"></param>
/// <param name="method"></param>
/// <param name="invokeOption"></param>
/// <param name="parameters"></param>
/// <param name="types"></param>
public void Invoke(string targetID, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types)
/// <returns></returns>
public T Invoke<T>(string targetID, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types)
{
this.m_rpcActor.Invoke(targetID, method, invokeOption, ref parameters, types);
return this.m_rpcActor.Invoke<T>(targetID, method, invokeOption, ref parameters, types);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="targetID"></param>
/// <param name="method"></param>
/// <param name="invokeOption"></param>
/// <param name="parameters"></param>
/// <param name="types"></param>
/// <returns></returns>
public T Invoke<T>(string targetID, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types)
public Task InvokeAsync(string method, IInvokeOption invokeOption, params object[] parameters)
{
return this.m_rpcActor.Invoke<T>(targetID, method, invokeOption, ref parameters, types);
return this.m_rpcActor.InvokeAsync(method, invokeOption, parameters);
}
/// <summary>
......@@ -404,115 +397,161 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="cancellationToken"></param>
public void ResetID(string newID, CancellationToken cancellationToken = default)
{
if (this.Protocol == TouchRpcExtensions.TouchRpc)
if (this.Protocol == TouchRpcUtility.TouchRpcProtocol)
{
this.m_rpcActor.ResetID(newID, cancellationToken);
}
base.ResetID(newID);
}
#region 发送
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="buffer"></param>
public void Send(short protocol, byte[] buffer)
/// <param name="stream"></param>
/// <param name="streamOperator"></param>
/// <param name="metadata"></param>
/// <returns></returns>
public Result SendStream(Stream stream, StreamOperator streamOperator, Metadata metadata = null)
{
this.m_rpcActor.Send(protocol, buffer);
return this.m_rpcActor.SendStream(stream, streamOperator, metadata);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void Send(short protocol, byte[] buffer, int offset, int length)
/// <param name="stream"></param>
/// <param name="streamOperator"></param>
/// <param name="metadata"></param>
/// <returns></returns>
public Task<Result> SendStreamAsync(Stream stream, StreamOperator streamOperator, Metadata metadata = null)
{
this.m_rpcActor.Send(protocol, buffer, offset, length);
return this.m_rpcActor.SendStreamAsync(stream, streamOperator, metadata);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="dataByteBlock"></param>
public void Send(short protocol, ByteBlock dataByteBlock)
/// <param name="id"></param>
/// <param name="channel"></param>
/// <returns></returns>
public bool TrySubscribeChannel(int id, out Channel channel)
{
this.m_rpcActor.Send(protocol, dataByteBlock);
return this.m_rpcActor.TrySubscribeChannel(id, out channel);
}
internal void RpcActorSend(bool isAsync, ArraySegment<byte>[] transferBytes)
{
if (isAsync)
{
base.SendAsync(transferBytes);
}
else
{
base.Send(transferBytes);
}
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
public void Send(short protocol)
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
{
this.m_rpcActor.Send(protocol);
this.m_rpcActor.SafeDispose();
base.Dispose(disposing);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="buffer"></param>
public void SendAsync(short protocol, byte[] buffer)
/// <param name="byteBlock"></param>
/// <param name="requestInfo"></param>
protected override void HandleReceivedData(ByteBlock byteBlock, IRequestInfo requestInfo)
{
this.m_rpcActor.SendAsync(protocol, buffer);
if (this.Protocol == TouchRpcUtility.TouchRpcProtocol && byteBlock != null)
{
this.m_rpcActor.InputReceivedData(byteBlock);
}
else
{
base.HandleReceivedData(byteBlock, requestInfo);
}
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void SendAsync(short protocol, byte[] buffer, int offset, int length)
/// <param name="e"></param>
protected override void OnDisconnected(ClientDisconnectedEventArgs e)
{
this.m_rpcActor.SendAsync(protocol, buffer, offset, length);
this.m_rpcActor?.Close(e.Message);
base.OnDisconnected(e);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="dataByteBlock"></param>
public void SendAsync(short protocol, ByteBlock dataByteBlock)
/// <param name="request"></param>
protected override void OnReceivedHttpRequest(HttpRequest request)
{
this.m_rpcActor.SendAsync(protocol, dataByteBlock);
if (request.Method == TouchRpcUtility.TouchRpc)
{
request.SafeDispose();
this.InitRpcActor();
this.DefaultSend(new HttpResponse().SetStatus().BuildAsBytes());
return;
}
base.OnReceivedHttpRequest(request);
}
private void InitRpcActor()
{
this.SwitchProtocolToTouchRpc();
this.internalOnRpcActorInit?.Invoke(this);
this.m_rpcActor.ID = this.ID;
}
#region 发送
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
public void SendAsync(short protocol)
/// <param name="buffer"></param>
public void Send(short protocol, byte[] buffer)
{
this.m_rpcActor.SendAsync(protocol);
this.m_rpcActor.Send(protocol, buffer);
}
/// <summary>
/// 不允许直接发送
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public override void Send(byte[] buffer, int offset, int length)
public void Send(short protocol, byte[] buffer, int offset, int length)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
this.m_rpcActor.Send(protocol, buffer, offset, length);
}
/// <summary>
/// 不允许直接发送
/// <inheritdoc/>
/// </summary>
/// <param name="transferBytes"></param>
public override void Send(IList<ArraySegment<byte>> transferBytes)
/// <param name="protocol"></param>
/// <param name="dataByteBlock"></param>
public void Send(short protocol, ByteBlock dataByteBlock)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
this.m_rpcActor.Send(protocol, dataByteBlock);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="protocol"></param>
public void Send(short protocol)
{
this.m_rpcActor.Send(protocol);
}
/// <summary>
......@@ -521,7 +560,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public override void SendAsync(byte[] buffer, int offset, int length)
public override void Send(byte[] buffer, int offset, int length)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
......@@ -530,111 +569,72 @@ namespace TouchSocket.Rpc.TouchRpc
/// 不允许直接发送
/// </summary>
/// <param name="transferBytes"></param>
public override void SendAsync(IList<ArraySegment<byte>> transferBytes)
public override void Send(IList<ArraySegment<byte>> transferBytes)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
#endregion 发送
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="stream"></param>
/// <param name="streamOperator"></param>
/// <param name="metadata"></param>
/// <returns></returns>
public Result SendStream(Stream stream, StreamOperator streamOperator, Metadata metadata = null)
{
return this.m_rpcActor.SendStream(stream, streamOperator, metadata);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="stream"></param>
/// <param name="streamOperator"></param>
/// <param name="metadata"></param>
/// <returns></returns>
public Task<Result> SendStreamAsync(Stream stream, StreamOperator streamOperator, Metadata metadata = null)
/// <param name="protocol"></param>
/// <param name="buffer"></param>
public void SendAsync(short protocol, byte[] buffer)
{
return this.m_rpcActor.SendStreamAsync(stream, streamOperator, metadata);
this.m_rpcActor.SendAsync(protocol, buffer);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="id"></param>
/// <param name="channel"></param>
/// <returns></returns>
public bool TrySubscribeChannel(int id, out Channel channel)
/// <param name="protocol"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void SendAsync(short protocol, byte[] buffer, int offset, int length)
{
return this.m_rpcActor.TrySubscribeChannel(id, out channel);
this.m_rpcActor.SendAsync(protocol, buffer, offset, length);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
/// <param name="protocol"></param>
/// <param name="dataByteBlock"></param>
public void SendAsync(short protocol, ByteBlock dataByteBlock)
{
this.m_rpcActor.SafeDispose();
base.Dispose(disposing);
this.m_rpcActor.SendAsync(protocol, dataByteBlock);
}
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="requestInfo"></param>
protected override void HandleReceivedData(ByteBlock byteBlock, IRequestInfo requestInfo)
/// <param name="protocol"></param>
public void SendAsync(short protocol)
{
if (this.Protocol == TouchRpcExtensions.TouchRpc)
{
this.m_rpcActor.InputReceivedData(byteBlock);
}
else
{
base.HandleReceivedData(byteBlock, requestInfo);
}
this.m_rpcActor.SendAsync(protocol);
}
/// <summary>
/// <inheritdoc/>
/// 不允许直接发送
/// </summary>
/// <param name="request"></param>
protected override void OnReceivedHttpRequest(HttpRequest request)
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public override void SendAsync(byte[] buffer, int offset, int length)
{
if (request.Method == TouchRpcUtility.TouchRpc)
{
request.SafeDispose();
this.InitRpcActor();
this.DefaultSend(new HttpResponse().SetStatus().BuildAsBytes());
return;
}
base.OnReceivedHttpRequest(request);
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
/// <summary>
/// <inheritdoc/>
/// 不允许直接发送
/// </summary>
/// <param name="e"></param>
protected override void OnDisconnected(ClientDisconnectedEventArgs e)
/// <param name="transferBytes"></param>
public override void SendAsync(IList<ArraySegment<byte>> transferBytes)
{
this.m_rpcActor?.Close(e.Message);
base.OnDisconnected(e);
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
internal void RpcActorSend(bool isAsync, ArraySegment<byte>[] transferBytes)
{
if (isAsync)
{
base.SendAsync(transferBytes);
}
else
{
base.Send(transferBytes);
}
}
#endregion 发送
}
}
\ No newline at end of file
......@@ -798,13 +798,14 @@ namespace TouchSocket.Rpc.TouchRpc
#endregion RPC解析器
#region 事件触发
/// <summary>
/// 当文件传输结束之后。并不意味着完成传输,请通过<see cref="FileTransferStatusEventArgs.Result"/>属性值进行判断。
/// </summary>
/// <param name="e"></param>
protected virtual void OnFileTransfered(FileTransferStatusEventArgs e)
{
}
/// <summary>
......@@ -813,7 +814,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfering(FileOperationEventArgs e)
{
}
/// <summary>
......@@ -851,7 +852,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="byteBlock"></param>
protected virtual void OnReceived(short protocol, ByteBlock byteBlock)
{
}
/// <summary>
......@@ -860,7 +861,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfered(StreamStatusEventArgs e)
{
}
/// <summary>
......@@ -869,7 +870,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfering(StreamOperationEventArgs e)
{
}
#endregion 事件触发
}
......
......@@ -15,8 +15,8 @@ using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Config;
using TouchSocket.Resources;
using TouchSocket.Core.Serialization;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
......@@ -129,7 +129,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfered(TClient client, FileTransferStatusEventArgs e)
{
}
/// <summary>
......@@ -139,7 +139,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnFileTransfering(TClient client, FileOperationEventArgs e)
{
}
/// <summary>
......@@ -149,7 +149,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnHandshaked(TClient client, VerifyOptionEventArgs e)
{
}
/// <summary>
......@@ -159,7 +159,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e">参数</param>
protected virtual void OnHandshaking(TClient client, VerifyOptionEventArgs e)
{
}
/// <summary>
......@@ -170,7 +170,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="byteBlock"></param>
protected virtual void OnReceived(TClient client, short protocol, ByteBlock byteBlock)
{
}
/// <summary>
......@@ -180,7 +180,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfered(TClient client, StreamStatusEventArgs e)
{
}
/// <summary>
......@@ -190,7 +190,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="e"></param>
protected virtual void OnStreamTransfering(TClient client, StreamOperationEventArgs e)
{
}
#endregion 事件
......
......@@ -32,17 +32,17 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// <inheritdoc/>
/// </summary>
public Func<IRpcClient, bool> TryCanInvoke { get; set; }
public bool IsHandshaked => this.m_rpcActor == null ? false : this.m_rpcActor.IsHandshaked;
/// <summary>
/// <inheritdoc/>
/// </summary>
public bool IsHandshaked => this.m_rpcActor == null ? false : this.m_rpcActor.IsHandshaked;
public ResponseType ResponseType { get => this.m_rpcActor.ResponseType; set => this.m_rpcActor.ResponseType = value; }
/// <summary>
/// <inheritdoc/>
/// </summary>
public ResponseType ResponseType { get => this.m_rpcActor.ResponseType; set => this.m_rpcActor.ResponseType = value; }
public string RootPath { get => this.m_rpcActor.RootPath; set => this.m_rpcActor.RootPath = value; }
/// <summary>
/// <inheritdoc/>
......@@ -52,12 +52,12 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// <inheritdoc/>
/// </summary>
public string RootPath { get => this.m_rpcActor.RootPath; set => this.m_rpcActor.RootPath = value; }
public SerializationSelector SerializationSelector => this.m_rpcActor.SerializationSelector;
/// <summary>
/// <inheritdoc/>
/// </summary>
public SerializationSelector SerializationSelector => this.m_rpcActor.SerializationSelector;
public Func<IRpcClient, bool> TryCanInvoke { get; set; }
/// <summary>
/// 验证超时时间,默认为3000ms
......@@ -448,6 +448,26 @@ namespace TouchSocket.Rpc.TouchRpc
this.m_rpcActor.Send(protocol);
}
/// <summary>
/// 不允许直接发送
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public override void Send(byte[] buffer, int offset, int length)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
/// <summary>
/// 不允许直接发送
/// </summary>
/// <param name="transferBytes"></param>
public override void Send(IList<ArraySegment<byte>> transferBytes)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
/// <summary>
/// <inheritdoc/>
/// </summary>
......@@ -489,26 +509,6 @@ namespace TouchSocket.Rpc.TouchRpc
this.m_rpcActor.SendAsync(protocol);
}
/// <summary>
/// 不允许直接发送
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public override void Send(byte[] buffer, int offset, int length)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
/// <summary>
/// 不允许直接发送
/// </summary>
/// <param name="transferBytes"></param>
public override void Send(IList<ArraySegment<byte>> transferBytes)
{
throw new Exception("不允许直接发送,请指定任意大于0的协议,然后发送。");
}
/// <summary>
/// 不允许直接发送
/// </summary>
......@@ -566,6 +566,18 @@ namespace TouchSocket.Rpc.TouchRpc
return this.m_rpcActor.TrySubscribeChannel(id, out channel);
}
internal void RpcActorSend(bool isAsync, ArraySegment<byte>[] transferBytes)
{
if (isAsync)
{
base.SendAsync(transferBytes);
}
else
{
base.Send(transferBytes);
}
}
/// <summary>
/// <inheritdoc/>
/// </summary>
......@@ -623,17 +635,5 @@ namespace TouchSocket.Rpc.TouchRpc
this.m_rpcActor.Close(e.Message);
base.OnDisconnected(e);
}
internal void RpcActorSend(bool isAsync, ArraySegment<byte>[] transferBytes)
{
if (isAsync)
{
base.SendAsync(transferBytes);
}
else
{
base.Send(transferBytes);
}
}
}
}
\ No newline at end of file
......@@ -454,7 +454,6 @@ namespace TouchSocket.Rpc.TouchRpc
throw new NotImplementedException();
}
void IIDRpcActor.Invoke(string targetID, string method, IInvokeOption invokeOption, ref object[] parameters, Type[] types)
{
throw new NotImplementedException();
......
......@@ -10,10 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Core;
......
......@@ -352,19 +352,14 @@ namespace TouchSocket.Rpc.TouchRpc
loopAction.RunAsync();
}
/// <summary>
/// TouchRpc
/// </summary>
public static Protocol TouchRpc { get; private set; } = new Protocol("TouchRpc");
/// <summary>
/// 转化Protocol协议标识为TouchRpc
/// </summary>
/// <param name="client"></param>
public static void SwitchProtocolToTouchRpc(this ITcpClientBase client)
{
client.Protocol = TouchRpc;
client.SetDataHandlingAdapter(new FixedHeaderPackageAdapter());
client.Protocol = TouchRpcUtility.TouchRpcProtocol;
}
/// <summary>
......
......@@ -19,7 +19,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// 文件
/// </summary>
static class FileTool
internal static class FileTool
{
/// <summary>
/// 获取文件信息
......
......@@ -11,9 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.IO;
namespace TouchSocket.Rpc.TouchRpc
{
/// <summary>
......
......@@ -11,11 +11,7 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TouchSocket.Rpc.TouchRpc
{
......
......@@ -16,7 +16,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// 文件信息
/// </summary>
public class TouchRpcFileInfo: RemoteFileInfo
public class TouchRpcFileInfo : RemoteFileInfo
{
/// <summary>
/// 流位置
......
......@@ -74,7 +74,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
Task<Result> PushFileAsync(FileRequest fileRequest, FileOperator fileOperator, Metadata metadata = null);
/// <summary>
/// 判断使用该ID的Channel是否存在。
/// </summary>
......
......@@ -10,12 +10,7 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Core.Serialization;
namespace TouchSocket.Rpc.TouchRpc
......@@ -34,7 +29,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// TouchRpcContext
/// </summary>
TouchRpcPackage TouchRpcPackage { get;}
TouchRpcPackage TouchRpcPackage { get; }
/// <summary>
/// 序列化类型
......
......@@ -13,7 +13,6 @@
using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Sockets;
using TouchSocket.Sockets.Plugins;
namespace TouchSocket.Rpc.TouchRpc.Plugins
{
......@@ -227,6 +226,7 @@ namespace TouchSocket.Rpc.TouchRpc.Plugins
{
return this.OnReceivedProtocolDataAsync((TClient)client, e);
}
void ITouchRpcPlugin.OnStreamTransfered(ITouchRpc client, StreamStatusEventArgs e)
{
this.OnStreamTransfered((TClient)client, e);
......
......@@ -13,9 +13,9 @@
using System.Collections.Generic;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Resources;
using TouchSocket.Core.Run;
using TouchSocket.Core.Serialization;
using TouchSocket.Resources;
using TouchSocket.Sockets;
namespace TouchSocket.Rpc.TouchRpc
......
......@@ -11,10 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Http;
namespace TouchSocket.Rpc.WebApi
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Http;
namespace TouchSocket.Rpc.WebApi
......@@ -22,11 +17,11 @@ namespace TouchSocket.Rpc.WebApi
/// <summary>
/// IWebApiCallContext
/// </summary>
public interface IWebApiCallContext:ICallContext
public interface IWebApiCallContext : ICallContext
{
/// <summary>
/// Http上下文
/// </summary>
HttpContext HttpContext { get;}
HttpContext HttpContext { get; }
}
}
......@@ -120,7 +120,7 @@ namespace TouchSocket.Rpc.WebApi
}
if (invokeResult.Status == InvokeStatus.Ready)
{
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext,ps);
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext, ps);
if (rpcServer is ITransientRpcServer transientRpcServer)
{
transientRpcServer.CallContext = callContext;
......@@ -249,7 +249,7 @@ namespace TouchSocket.Rpc.WebApi
}
if (invokeResult.Status == InvokeStatus.Ready)
{
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext,ps);
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext, ps);
if (rpcServer is ITransientRpcServer transientRpcServer)
{
transientRpcServer.CallContext = callContext;
......
......@@ -10,12 +10,7 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Http;
namespace TouchSocket.Rpc.XmlRpc
......
......@@ -10,11 +10,6 @@
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Http;
namespace TouchSocket.Rpc.XmlRpc
......@@ -22,7 +17,7 @@ namespace TouchSocket.Rpc.XmlRpc
/// <summary>
/// IXmlRpcCallContext
/// </summary>
public interface IXmlRpcCallContext:ICallContext
public interface IXmlRpcCallContext : ICallContext
{
/// <summary>
/// XmlRpc的调用字符串。
......
......@@ -11,7 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Xml;
using TouchSocket.Core.ByteManager;
......@@ -120,7 +119,7 @@ namespace TouchSocket.Rpc.XmlRpc
object[] ps = null;
InvokeResult invokeResult = new InvokeResult();
XmlRpcCallContext callContext=null;
XmlRpcCallContext callContext = null;
if (this.m_actionMap.TryGetMethodInstance(actionKey, out MethodInstance methodInstance))
{
......@@ -144,7 +143,7 @@ namespace TouchSocket.Rpc.XmlRpc
foreach (XmlNode paramNode in paramsNode.ChildNodes)
{
XmlNode valueNode = paramNode.FirstChild.FirstChild;
ps[index]=(XmlDataTool.GetValue(valueNode, methodInstance.ParameterTypes[index]));
ps[index] = (XmlDataTool.GetValue(valueNode, methodInstance.ParameterTypes[index]));
index++;
}
}
......@@ -180,7 +179,7 @@ namespace TouchSocket.Rpc.XmlRpc
if (invokeResult.Status == InvokeStatus.Ready)
{
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext,ps);
IRpcServer rpcServer = methodInstance.ServerFactory.Create(callContext, ps);
if (rpcServer is ITransientRpcServer transientRpcServer)
{
transientRpcServer.CallContext = callContext;
......
......@@ -11,151 +11,142 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Collections.Concurrent;
namespace TouchSocket.Sockets
{
/// <summary>
/// 异步独立线程发送器
/// 延迟发送器
/// </summary>
internal class AsyncSender : DisposableObject
public sealed class DelaySender : DisposableObject
{
private static int m_cacheLength = 1024 * 1024 * 100;
private readonly IntelligentDataQueue<QueueDataBytes> m_asyncBytes;
private readonly byte[] m_buffer = new byte[1024 * 1024];
private readonly IntelligentDataQueue<QueueDataBytes> m_queueDatas;
private readonly ReaderWriterLockSlim m_lockSlim;
private readonly Action<Exception> m_onError;
private readonly SocketAsyncEventArgs m_sendEventArgs;
private readonly Thread m_sendThread;
private readonly Socket m_socket;
private readonly WaitCallback m_waitCallback_Send;
private readonly EventWaitHandle m_waitHandle;
private volatile bool m_sending;
internal AsyncSender(Socket socket, EndPoint endPoint, Action<Exception> onError)
/// <summary>
/// 延迟发送器
/// </summary>
/// <param name="socket"></param>
/// <param name="queueLength"></param>
/// <param name="onError"></param>
public DelaySender(Socket socket, int queueLength, Action<Exception> onError)
{
this.m_sendEventArgs = new SocketAsyncEventArgs();
this.m_sendEventArgs.Completed += this.SendEventArgs_Completed;
this.m_socket = socket;
this.m_sendEventArgs.RemoteEndPoint = endPoint;
this.m_onError = onError;
this.m_asyncBytes = new IntelligentDataQueue<QueueDataBytes>(1024 * 1024 * 10);
this.m_queueDatas = new IntelligentDataQueue<QueueDataBytes>(queueLength);
this.m_waitHandle = new AutoResetEvent(false);
this.m_sendThread = new Thread(this.BeginSend);
this.m_sendThread.IsBackground = true;
this.m_sendThread.Name = "AsyncSendThread";
this.m_sendThread.Start();
this.m_waitCallback_Send = this.BeginSend;
this.m_lockSlim = new ReaderWriterLockSlim();
}
/// <summary>
/// 缓存发送池尺寸,
/// 默认100*1024*1024字节
/// 延迟包最大尺寸,默认1024*512字节。
/// </summary>
public static int CacheLength
public int DelayLength { get; set; } = 1024 * 512;
/// <summary>
/// 是否处于发送状态
/// </summary>
public bool Sending
{
get => m_cacheLength;
set => m_cacheLength = value;
get
{
using (new ReadLock(this.m_lockSlim))
{
return this.m_sending;
}
}
private set
{
using (new WriteLock(this.m_lockSlim))
{
this.m_sending = value;
}
}
}
internal void AsyncSend(byte[] buffer, int offset, int length)
/// <summary>
/// 发送
/// </summary>
public void Send(QueueDataBytes dataBytes)
{
QueueDataBytes asyncByte = new QueueDataBytes(buffer, offset, length);
this.m_asyncBytes.Enqueue(asyncByte);
if (!this.m_sending)
//this.m_socket.AbsoluteSend(dataBytes.Buffer, dataBytes.Offset, dataBytes.Length);
//return;
this.m_queueDatas.Enqueue(dataBytes);
if (!this.Sending)
{
this.m_sending = true;
this.Sending = true;
this.m_waitHandle.Set();
ThreadPool.QueueUserWorkItem(this.m_waitCallback_Send);
}
}
/// <summary>
/// 释放
/// </summary>
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
{
this.m_queueDatas.Clear();
this.m_waitHandle.Set();
this.m_waitHandle.SafeDispose();
this.m_sendEventArgs.SafeDispose();
base.Dispose(disposing);
}
private void BeginSend()
private void BeginSend(object o)
{
while (!this.m_disposedValue)
byte[] buffer = BytePool.GetByteCore(this.DelayLength);
while (!this.DisposedValue)
{
try
{
if (this.TryGet(out QueueDataBytes asyncByte))
if (this.TryGet(buffer, out QueueDataBytes asyncByte))
{
this.m_sendEventArgs.SetBuffer(asyncByte.Buffer, asyncByte.Offset, asyncByte.Length);
if (!this.m_socket.SendAsync(this.m_sendEventArgs))
{
// 同步发送时处理发送完成事件
this.ProcessSend(this.m_sendEventArgs);
}
else
{
this.m_waitHandle.WaitOne();
}
this.m_socket.AbsoluteSend(asyncByte.Buffer, asyncByte.Offset, asyncByte.Length);
}
else
{
this.m_sending = false;
this.m_waitHandle.WaitOne();
break;
}
}
catch (Exception ex)
{
this.m_onError?.Invoke(ex);
break;
}
}
BytePool.Recycle(buffer);
this.Sending = false;
}
/// <summary>
/// 发送完成时处理函数
/// </summary>
/// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>
private void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.m_onError?.Invoke(new Exception(e.SocketError.ToString()));
}
}
private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Send)
{
this.ProcessSend(e);
if (!this.m_disposedValue)
{
this.m_waitHandle.Set();
}
}
}
private bool TryGet(out QueueDataBytes asyncByteDe)
private bool TryGet(byte[] buffer, out QueueDataBytes asyncByteDe)
{
int len = 0;
int surLen = this.m_buffer.Length;
int surLen = buffer.Length;
while (true)
{
if (this.m_asyncBytes.TryPeek(out QueueDataBytes asyncB))
if (this.m_queueDatas.TryPeek(out QueueDataBytes asyncB))
{
if (surLen > asyncB.Length)
{
if (this.m_asyncBytes.TryDequeue(out QueueDataBytes asyncByte))
if (this.m_queueDatas.TryDequeue(out QueueDataBytes asyncByte))
{
Array.Copy(asyncByte.Buffer, asyncByte.Offset, this.m_buffer, len, asyncByte.Length);
Array.Copy(asyncByte.Buffer, asyncByte.Offset, buffer, len, asyncByte.Length);
len += asyncByte.Length;
surLen -= asyncByte.Length;
}
}
else if (asyncB.Length > this.m_buffer.Length)
else if (asyncB.Length > buffer.Length)
{
if (len > 0)
{
......@@ -185,7 +176,7 @@ namespace TouchSocket.Sockets
}
}
}
asyncByteDe = new QueueDataBytes(this.m_buffer, 0, len);
asyncByteDe = new QueueDataBytes(buffer, 0, len);
return true;
}
}
......
......@@ -178,9 +178,9 @@ namespace TouchSocket.Sockets
this.IP = ip;
this.Port = portNum;
}
catch(Exception ex)
catch (Exception ex)
{
throw new Exception($"IPHost初始化失败,信息:{ex.Message}",ex);
throw new Exception($"IPHost初始化失败,信息:{ex.Message}", ex);
}
}
......
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
namespace TouchSocket.Sockets
{
/// <summary>
/// DelaySenderOption
/// </summary>
public class DelaySenderOption
{
/// <summary>
/// 延迟队列最大尺寸,默认1024*1024*10字节。
/// </summary>
public int QueueLength { get; set; } = 1024 * 1024 * 10;
/// <summary>
/// 延迟包最大尺寸,默认1024*512字节。
/// </summary>
public int DelayLength { get; set; } = 1024 * 512;
}
}
......@@ -96,5 +96,10 @@ namespace TouchSocket.Sockets
return regex1.IsMatch(input);
}
}
/// <summary>
/// 大数据边界
/// </summary>
public const int BigDataBoundary = 1024 * 512;
}
}
\ No newline at end of file
......@@ -19,6 +19,7 @@ using System.Net.Sockets;
using System.Threading;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Collections.Concurrent;
using TouchSocket.Core.Config;
using TouchSocket.Core.Dependency;
using TouchSocket.Core.Log;
......@@ -39,23 +40,15 @@ namespace TouchSocket.Sockets
public SocketClient()
{
this.Protocol = Protocol.TCP;
this.m_sendLocker = new object();
}
#region 变量
internal TouchSocketConfig m_config;
internal string m_id;
internal long m_lastTick;
internal ReceiveType m_receiveType;
internal TcpServiceBase m_service;
internal bool m_usePlugin;
/// <summary>
/// 发送锁对象
/// </summary>
protected readonly object m_sendLocker;
private DataHandlingAdapter m_adapter;
private Socket m_mainSocket;
private int m_maxPackageSize;
......@@ -63,6 +56,8 @@ namespace TouchSocket.Sockets
private Stream m_workStream;
private string serviceIP;
private int servicePort;
private bool m_useDelaySender;
private DelaySender m_delaySender;
#endregion 变量
......@@ -86,7 +81,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// <inheritdoc/>
/// </summary>
public TouchSocketConfig Config => this.m_config;
public TouchSocketConfig Config { get; internal set; }
/// <summary>
/// <inheritdoc/>
......@@ -303,6 +298,17 @@ namespace TouchSocket.Sockets
internal void InternalConnected(TouchSocketEventArgs e)
{
this.m_online = true;
if (this.Config.GetValue<DelaySenderOption>(TouchSocketConfigExtension.DelaySenderProperty) is DelaySenderOption senderOption)
{
this.m_useDelaySender = true;
this.m_delaySender.SafeDispose();
this.m_delaySender = new DelaySender(this.m_mainSocket, senderOption.QueueLength, this.OnDelaySenderError)
{
DelayLength = senderOption.DelayLength
};
}
if (this.m_usePlugin && this.PluginsManager.Raise<ITcpPlugin>(nameof(ITcpPlugin.OnConnected), this, e))
{
return;
......@@ -323,7 +329,29 @@ namespace TouchSocket.Sockets
internal void SetSocket(Socket mainSocket)
{
this.m_mainSocket = mainSocket ?? throw new ArgumentNullException(nameof(mainSocket));
this.OnSocketInitialized(mainSocket);
this.IP = mainSocket.RemoteEndPoint.GetIP();
this.Port = mainSocket.RemoteEndPoint.GetPort();
this.serviceIP = mainSocket.LocalEndPoint.GetIP();
this.servicePort = mainSocket.LocalEndPoint.GetPort();
}
/// <summary>
/// 在延迟发生错误
/// </summary>
/// <param name="ex"></param>
protected virtual void OnDelaySenderError(Exception ex)
{
this.Logger.Log(LogType.Error, this, "发送错误", ex);
}
/// <summary>
/// 当初始化完成时
/// </summary>
protected virtual void OnInitialized()
{
}
internal void InternalInitialized()
{
this.OnInitialized();
}
/// <summary>
......@@ -398,19 +426,6 @@ namespace TouchSocket.Sockets
this.Disconnected?.Invoke(this, e);
}
/// <summary>
/// 初始化设置Socket。
/// <para>父函数实现了获取IP,端口等信息的操作</para>
/// </summary>
/// <param name="mainSocket"></param>
protected virtual void OnSocketInitialized(Socket mainSocket)
{
this.IP = mainSocket.RemoteEndPoint.GetIP();
this.Port = mainSocket.RemoteEndPoint.GetPort();
this.serviceIP = mainSocket.LocalEndPoint.GetIP();
this.servicePort = mainSocket.LocalEndPoint.GetPort();
}
/// <summary>
/// 设置适配器,该方法不会检验<see cref="CanSetDataHandlingAdapter"/>的值。
/// </summary>
......@@ -425,7 +440,7 @@ namespace TouchSocket.Sockets
adapter.OnLoaded(this);
adapter.ReceivedCallBack = this.PrivateHandleReceivedData;
adapter.SendCallBack = this.SocketSend;
if (this.m_config != null)
if (this.Config != null)
{
this.m_maxPackageSize = Math.Max(adapter.MaxPackageSize, this.Config.GetValue<int>(TouchSocketConfigExtension.MaxPackageSizeProperty));
adapter.MaxPackageSize = this.m_maxPackageSize;
......@@ -452,12 +467,31 @@ namespace TouchSocket.Sockets
}
if (this.HandleSendingData(buffer, offset, length))
{
lock (this.m_sendLocker)
if (this.UseSsl)
{
if (this.UseSsl)
if (isAsync)
{
this.m_workStream.WriteAsync(buffer, offset, length);
}
else
{
this.m_workStream.Write(buffer, offset, length);
}
}
else
{
if (this.m_useDelaySender && length < TouchSocketUtility.BigDataBoundary)
{
if (isAsync)
{
this.m_delaySender.Send(new QueueDataBytes(buffer, offset, length));
}
else
{
this.m_delaySender.Send(QueueDataBytes.CreateNew(buffer, offset, length));
}
}
else
{
if (isAsync)
......@@ -466,16 +500,7 @@ namespace TouchSocket.Sockets
}
else
{
while (length > 0)
{
int r = this.m_mainSocket.Send(buffer, offset, length, SocketFlags.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
}
offset += r;
length -= r;
}
this.m_mainSocket.AbsoluteSend(buffer, offset, length);
}
}
}
......@@ -511,6 +536,7 @@ namespace TouchSocket.Sockets
if (this.m_online)
{
this.m_online = false;
this.m_delaySender.SafeDispose();
this.m_adapter.SafeDispose();
this.m_mainSocket.SafeDispose();
this.m_service?.SocketClients.TryRemove(this.m_id, out _);
......@@ -526,7 +552,7 @@ namespace TouchSocket.Sockets
try
{
int r = this.m_workStream.EndRead(result);
if (r==0)
if (r == 0)
{
this.BreakOut("远程终端主动关闭", false);
}
......
......@@ -19,6 +19,7 @@ using System.Runtime.InteropServices;
using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Collections.Concurrent;
using TouchSocket.Core.Config;
using TouchSocket.Core.Data.Security;
using TouchSocket.Core.Dependency;
......@@ -61,30 +62,22 @@ namespace TouchSocket.Sockets
/// </summary>
public TcpClientBase()
{
this.sendLocker = new object();
this.Protocol = Protocol.TCP;
}
#region 变量
private AsyncSender m_asyncSender;
private DelaySender m_delaySender;
private TouchSocketConfig m_config;
private DataHandlingAdapter m_adapter;
private Socket m_mainSocket;
private bool m_online;
private ReceiveType m_receiveType;
private bool m_separateThreadSend;
private bool m_useDelaySender;
private bool m_usePlugin;
private bool m_useSsl;
private Stream m_workStream;
private int m_maxPackageSize;
private IPHost m_remoteIPHost;
/// <summary>
/// 发送锁对象
/// </summary>
protected readonly object sendLocker;
#endregion 变量
#region 事件
......@@ -264,11 +257,6 @@ namespace TouchSocket.Sockets
/// </summary>
public ReceiveType ReceiveType => this.m_receiveType;
/// <summary>
/// 在异步发送时,使用独立线程发送
/// </summary>
public bool SeparateThreadSend => this.m_separateThreadSend;
/// <summary>
/// 是否已启用插件
/// </summary>
......@@ -277,7 +265,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// <inheritdoc/>
/// </summary>
public bool UseSsl => this.m_useSsl;
public bool UseSsl { get; private set; }
/// <summary>
/// <inheritdoc/>
......@@ -323,7 +311,7 @@ namespace TouchSocket.Sockets
if (this.m_online)
{
this.m_mainSocket.SafeDispose();
this.m_asyncSender.SafeDispose();
this.m_delaySender.SafeDispose();
this.m_workStream.SafeDispose();
this.m_adapter.SafeDispose();
this.m_online = false;
......@@ -403,11 +391,16 @@ namespace TouchSocket.Sockets
this.m_mainSocket.EndConnect(result);
this.LoadSocketAndReadIpPort();
if (this.m_separateThreadSend)
if (this.m_config.GetValue<DelaySenderOption>(TouchSocketConfigExtension.DelaySenderProperty) is DelaySenderOption senderOption)
{
this.m_asyncSender.SafeDispose();
this.m_asyncSender = new AsyncSender(this.m_mainSocket, this.m_mainSocket.RemoteEndPoint, this.OnSeparateThreadSendError);
this.m_useDelaySender = true;
this.m_delaySender.SafeDispose();
this.m_delaySender = new DelaySender(this.m_mainSocket, senderOption.QueueLength, this.OnDelaySenderError)
{
DelayLength = senderOption.DelayLength
};
}
this.BeginReceive();
this.m_online = true;
this.PrivateOnConnected(new MsgEventArgs("连接成功"));
......@@ -551,27 +544,22 @@ namespace TouchSocket.Sockets
this.m_remoteIPHost = config.GetValue<IPHost>(TouchSocketConfigExtension.RemoteIPHostProperty);
this.m_maxPackageSize = config.GetValue<int>(TouchSocketConfigExtension.MaxPackageSizeProperty);
this.BufferLength = config.GetValue<int>(TouchSocketConfigExtension.BufferLengthProperty);
this.m_separateThreadSend = config.GetValue<bool>(TouchSocketConfigExtension.SeparateThreadSendProperty);
this.m_receiveType = config.GetValue<ReceiveType>(TouchSocketConfigExtension.ReceiveTypeProperty);
this.m_usePlugin = config.IsUsePlugin;
this.Logger = this.Container.Resolve<ILog>();
if (config.GetValue(TouchSocketConfigExtension.SslOptionProperty) != null)
{
if (this.m_separateThreadSend)
{
throw new Exception("Ssl配置下,不允许独立线程发送。");
}
this.m_useSsl = true;
this.UseSsl = true;
}
}
/// <summary>
/// 在独立发送线程中发生错误
/// 在延迟发生错误
/// </summary>
/// <param name="ex"></param>
protected virtual void OnSeparateThreadSendError(System.Exception ex)
protected virtual void OnDelaySenderError(Exception ex)
{
this.Logger.Log(LogType.Error, this, "独立线程发送错误", ex);
this.Logger.Log(LogType.Error, this, "发送错误", ex);
}
/// <summary>
......@@ -602,7 +590,7 @@ namespace TouchSocket.Sockets
this.m_workStream.Dispose();
}
if (this.m_useSsl)
if (this.UseSsl)
{
ClientSslOption sslOption = this.m_config.GetValue<ClientSslOption>(TouchSocketConfigExtension.SslOptionProperty);
SslStream sslStream = (sslOption.CertificateValidationCallback != null) ? new SslStream(new NetworkStream(this.m_mainSocket, false), false, sslOption.CertificateValidationCallback) : new SslStream(new NetworkStream(this.m_mainSocket, false), false);
......@@ -1046,37 +1034,40 @@ namespace TouchSocket.Sockets
}
if (this.HandleSendingData(buffer, offset, length))
{
lock (this.sendLocker)
if (this.UseSsl)
{
if (this.m_useSsl)
if (isAsync)
{
this.m_workStream.WriteAsync(buffer, offset, length);
}
else
{
this.m_workStream.Write(buffer, offset, length);
}
}
else
{
if (this.m_useDelaySender && length < TouchSocketUtility.BigDataBoundary)
{
if (isAsync)
{
this.m_delaySender.Send(new QueueDataBytes(buffer, offset, length));
}
else
{
this.m_delaySender.Send(QueueDataBytes.CreateNew(buffer, offset, length));
}
}
else
{
if (isAsync)
{
if (this.m_separateThreadSend)
{
this.m_asyncSender.AsyncSend(buffer, offset, length);
}
else
{
this.m_mainSocket.BeginSend(buffer, offset, length, SocketFlags.None, null, null);
}
this.m_mainSocket.BeginSend(buffer, offset, length, SocketFlags.None, null, null);
}
else
{
while (length > 0)
{
int r = this.MainSocket.Send(buffer, offset, length, SocketFlags.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
}
offset += r;
length -= r;
}
this.m_mainSocket.AbsoluteSend(buffer, offset, length);
}
}
}
......
......@@ -84,7 +84,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// <inheritdoc/>
/// </summary>
public override Func<string> GetDefaultNewID => m_getDefaultNewID;
public override Func<string> GetDefaultNewID => this.m_getDefaultNewID;
/// <summary>
/// <inheritdoc/>
......@@ -155,7 +155,7 @@ namespace TouchSocket.Sockets
/// </summary>
/// <param name="socketClient"></param>
/// <param name="e"></param>
protected override sealed void OnClientConnected(ISocketClient socketClient, TouchSocketEventArgs e)
protected sealed override void OnClientConnected(ISocketClient socketClient, TouchSocketEventArgs e)
{
this.OnConnected((TClient)socketClient, e);
}
......@@ -165,7 +165,7 @@ namespace TouchSocket.Sockets
/// </summary>
/// <param name="socketClient"></param>
/// <param name="e"></param>
protected override sealed void OnClientConnecting(ISocketClient socketClient, ClientOperationEventArgs e)
protected sealed override void OnClientConnecting(ISocketClient socketClient, ClientOperationEventArgs e)
{
this.OnConnecting((TClient)socketClient, e);
}
......@@ -175,7 +175,7 @@ namespace TouchSocket.Sockets
/// </summary>
/// <param name="socketClient"></param>
/// <param name="e"></param>
protected override sealed void OnClientDisconnected(ISocketClient socketClient, ClientDisconnectedEventArgs e)
protected sealed override void OnClientDisconnected(ISocketClient socketClient, ClientDisconnectedEventArgs e)
{
this.OnDisconnected((TClient)socketClient, e);
}
......@@ -186,7 +186,7 @@ namespace TouchSocket.Sockets
/// <param name="socketClient"></param>
/// <param name="byteBlock"></param>
/// <param name="requestInfo"></param>
protected override sealed void OnClientReceivedData(ISocketClient socketClient, ByteBlock byteBlock, IRequestInfo requestInfo)
protected sealed override void OnClientReceivedData(ISocketClient socketClient, ByteBlock byteBlock, IRequestInfo requestInfo)
{
this.OnReceived((TClient)socketClient, byteBlock, requestInfo);
}
......@@ -242,7 +242,7 @@ namespace TouchSocket.Sockets
}
}
this.OnIDChanged(socketClient,e);
this.OnIDChanged(socketClient, e);
}
/// <summary>
......@@ -316,7 +316,7 @@ namespace TouchSocket.Sockets
socketClient.m_id = newID;
if (this.m_socketClients.TryAdd(socketClient))
{
this.PrivateOnIDChanged(socketClient, new IDChangedEventArgs(oldID,newID));
this.PrivateOnIDChanged(socketClient, new IDChangedEventArgs(oldID, newID));
return;
}
else
......@@ -679,7 +679,7 @@ namespace TouchSocket.Sockets
{
client.m_usePlugin = this.m_usePlugin;
client.m_lastTick = DateTime.Now.Ticks;
client.m_config = this.m_config;
client.Config = this.m_config;
client.m_service = this;
client.Logger = this.Container.Resolve<ILog>();
client.ClearType = this.m_clearType;
......@@ -702,6 +702,7 @@ namespace TouchSocket.Sockets
TClient client = this.GetClientInstence();
this.SetClientConfiguration(client);
client.SetSocket(socket);
client.InternalInitialized();
ClientOperationEventArgs args = new ClientOperationEventArgs
{
......
......@@ -197,12 +197,6 @@ namespace TouchSocket.Sockets
public static readonly DependencyProperty RemoteIPHostProperty =
DependencyProperty.Register("RemoteIPHost", typeof(IPHost), typeof(TouchSocketConfigExtension), null);
/// <summary>
/// 在异步发送时,使用独立线程发送,所需类型<see cref="bool"/>
/// </summary>
public static readonly DependencyProperty SeparateThreadSendProperty =
DependencyProperty.Register("SeparateThreadSend", typeof(bool), typeof(TouchSocketConfigExtension), false);
/// <summary>
/// Ssl配置,为Null时则不启用
/// 所需类型<see cref="TouchSocket.Sockets.SslOption"/>
......@@ -210,6 +204,30 @@ namespace TouchSocket.Sockets
public static readonly DependencyProperty SslOptionProperty =
DependencyProperty.Register("SslOption", typeof(SslOption), typeof(TouchSocketConfigExtension), null);
/// <summary>
/// 是否使用延迟合并发送。默认null。不开启
/// 所需类型<see cref="DelaySenderOption"/>
/// </summary>
public static readonly DependencyProperty DelaySenderProperty =
DependencyProperty.Register("DelaySender", typeof(DelaySenderOption), typeof(TouchSocketConfigExtension), null);
/// <summary>
/// 使用默认配置延迟合并发送。
/// 所需类型<see cref="DelaySenderOption"/>
/// </summary>
/// <param name="config"></param>
/// <param name="option"></param>
/// <returns></returns>
public static TouchSocketConfig UseDelaySender(this TouchSocketConfig config, DelaySenderOption option = default)
{
if (option == default)
{
option = new DelaySenderOption();
}
config.SetValue(DelaySenderProperty, option);
return config;
}
/// <summary>
/// 固定端口绑定。
/// <para>在<see cref="UdpSessionBase"/>中表示本地监听地址</para>
......@@ -329,18 +347,6 @@ namespace TouchSocket.Sockets
config.SetValue(NoDelayProperty, true);
return config;
}
/// <summary>
/// 在异步发送时,使用独立线程发送。
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public static TouchSocketConfig UseSeparateThreadSend(this TouchSocketConfig config)
{
config.SetValue(SeparateThreadSendProperty, true);
return config;
}
#endregion TcpClient
#region TcpService
......
......@@ -12,14 +12,13 @@
//------------------------------------------------------------------------------
using System;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.IO;
namespace TouchSocket.Sockets
{
/// <summary>
/// 大数据用户自定义固定包头解析器,使用该适配器时,接收方收到的数据中,<see cref="ByteBlock"/>将为null,同时<see cref="IRequestInfo"/>将实现为TFixedHeaderRequestInfo。
/// </summary>
public abstract class CustomBigFixedHeaderDataHandlingAdapter<TFixedHeaderRequestInfo> : CustomDataHandlingAdapter<TFixedHeaderRequestInfo>
public abstract class CustomBigFixedHeaderDataHandlingAdapter<TFixedHeaderRequestInfo> : CustomDataHandlingAdapter<TFixedHeaderRequestInfo>
where TFixedHeaderRequestInfo : class, IBigFixedHeaderRequestInfo
{
/// <summary>
......
......@@ -21,7 +21,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// 数据处理适配器
/// </summary>
public abstract class DataHandlingAdapter:DisposableObject
public abstract class DataHandlingAdapter : DisposableObject
{
private ITcpClientBase m_client;
......
......@@ -11,10 +11,6 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.IO;
......@@ -37,8 +33,9 @@ namespace TouchSocket.Sockets
public PipelineDataHandlingAdapter()
{
}
byte[] m_buffer;
Task m_task;
private byte[] m_buffer;
private Task m_task;
/// <summary>
/// <inheritdoc/>
/// </summary>
......@@ -49,12 +46,12 @@ namespace TouchSocket.Sockets
{
this.m_task?.GetAwaiter().GetResult();
this.m_pipeline = new InternalPipeline(this.Client);
m_task = Task.Run(() =>
this.m_task = Task.Run(() =>
{
try
{
this.GoReceived(default, this.m_pipeline);
if (this.m_pipeline.CanReadLen>0)
if (this.m_pipeline.CanReadLen > 0)
{
this.m_buffer = new byte[this.m_pipeline.CanReadLen];
this.m_pipeline.Read(this.m_buffer, 0, this.m_buffer.Length);
......@@ -139,7 +136,7 @@ namespace TouchSocket.Sockets
internal void InternalInput(byte[] buffer, int offset, int length)
{
Input(buffer, offset, length);
this.Input(buffer, offset, length);
}
protected override void Dispose(bool disposing)
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using TouchSocket.Core;
namespace TouchSocket.Sockets
......@@ -10,7 +17,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// IDChangedEventArgs
/// </summary>
public class IDChangedEventArgs:TouchSocketEventArgs
public class IDChangedEventArgs : TouchSocketEventArgs
{
/// <summary>
/// IDChangedEventArgs
......@@ -26,7 +33,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// 旧ID
/// </summary>
public string OldID { get;private set; }
public string OldID { get; private set; }
/// <summary>
/// 新ID
......
//------------------------------------------------------------------------------
// 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权
// CSDN博客:https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频:https://space.bilibili.com/94253567
// Gitee源代码仓库:https://gitee.com/RRQM_Home
// Github源代码仓库:https://github.com/RRQM
// API首页:https://www.yuque.com/rrqm/touchsocket/index
// 交流QQ群:234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Net.Sockets;
namespace TouchSocket.Sockets
{
/// <summary>
/// SocketExtension
/// </summary>
public static class SocketExtension
{
/// <summary>
/// 会使用同步锁,保证所有数据上缓存区。
/// </summary>
/// <param name="socket"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public static void AbsoluteSend(this Socket socket, byte[] buffer, int offset, int length)
{
lock (socket)
{
while (length > 0)
{
int r = socket.Send(buffer, offset, length, SocketFlags.None);
if (r == 0 && length > 0)
{
throw new Exception("发送数据不完全");
}
offset += r;
length -= r;
}
}
}
}
}
......@@ -12,7 +12,6 @@
//------------------------------------------------------------------------------
using System;
using System.Threading;
using TouchSocket.Core.Dependency;
using TouchSocket.Core.Log;
using TouchSocket.Sockets;
using TouchSocket.Sockets.Plugins;
......
......@@ -17,8 +17,8 @@ using System.Threading.Tasks;
using TouchSocket.Core;
using TouchSocket.Core.ByteManager;
using TouchSocket.Core.Dependency;
using TouchSocket.Resources;
using TouchSocket.Core.Run;
using TouchSocket.Resources;
namespace TouchSocket.Sockets
{
......@@ -521,8 +521,7 @@ namespace TouchSocket.Sockets
{
responsedData = new ResponsedData(null, requestInfo);
}
return !this.m_waitData.Set(responsedData);
return !this.m_waitData.Set(responsedData);
}
}
}
\ No newline at end of file
......@@ -30,7 +30,7 @@ namespace TouchSocket.Sockets
/// <exception cref="NotConnectedException">客户端没有连接</exception>
/// <exception cref="OverlengthException">发送数据超长</exception>
/// <exception cref="Exception">其他异常</exception>
void Send(string id,IRequestInfo requestInfo);
void Send(string id, IRequestInfo requestInfo);
/// <summary>
/// 异步发送数据。
......@@ -43,6 +43,6 @@ namespace TouchSocket.Sockets
/// <exception cref="NotConnectedException">客户端没有连接</exception>
/// <exception cref="OverlengthException">发送数据超长</exception>
/// <exception cref="Exception">其他异常</exception>
void SendAsync(string id,IRequestInfo requestInfo);
void SendAsync(string id, IRequestInfo requestInfo);
}
}
......@@ -21,7 +21,7 @@ namespace TouchSocket.Sockets
/// <summary>
/// TCP客户端终端接口
/// </summary>
public interface ITcpClient : ITcpClientBase, IClientSender,IPluginObject
public interface ITcpClient : ITcpClientBase, IClientSender, IPluginObject
{
/// <summary>
/// 成功连接到服务器
......@@ -38,11 +38,6 @@ namespace TouchSocket.Sockets
/// </summary>
IPHost RemoteIPHost { get; }
/// <summary>
/// 独立线程发送
/// </summary>
bool SeparateThreadSend { get; }
/// <summary>
/// 连接服务器
/// </summary>
......
......@@ -11,10 +11,7 @@
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Tasks;
using TouchSocket.Core.Dependency;
using TouchSocket.Core.Log;
using TouchSocket.Core.Plugins;
namespace TouchSocket.Sockets.Plugins
......@@ -31,7 +28,7 @@ namespace TouchSocket.Sockets.Plugins
/// 初始化一个重连插件
/// </summary>
/// <param name="tryCon">无论如何,只要返回True,则结束本轮尝试</param>
public ReconnectionPlugin(Func<TClient,bool> tryCon)
public ReconnectionPlugin(Func<TClient, bool> tryCon)
{
this.Order = int.MinValue;
this.m_tryCon = tryCon;
......@@ -65,7 +62,7 @@ namespace TouchSocket.Sockets.Plugins
}
catch
{
}
}
});
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册