提交 4a38f1f1 编写于 作者: 若汝棋茗

修复TouchRpc在文件传输时内存泄漏问题

上级 44568916
......@@ -87,7 +87,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
return EasyTask.Run(() =>
{
Cancel(operationMes);
this.Cancel(operationMes);
});
}
......@@ -103,9 +103,9 @@ namespace TouchSocket.Rpc.TouchRpc
public Task CompleteAsync(string operationMes = null)
{
return EasyTask.Run(() =>
{
Complete(operationMes);
});
{
this.Complete(operationMes);
});
}
/// <summary>
......@@ -129,7 +129,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
return EasyTask.Run(() =>
{
HoldOn(operationMes);
this.HoldOn(operationMes);
});
}
......@@ -147,7 +147,7 @@ namespace TouchSocket.Rpc.TouchRpc
{
return EasyTask.Run(() =>
{
return MoveNext();
return this.MoveNext();
});
}
......@@ -158,13 +158,13 @@ namespace TouchSocket.Rpc.TouchRpc
public Task<byte[]> ReadAsync()
{
return EasyTask.Run(() =>
{
if (MoveNext())
{
return GetCurrent();
}
return null;
});
{
if (this.MoveNext())
{
return this.GetCurrent();
}
return null;
});
}
/// <summary>
......@@ -176,11 +176,11 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
public bool TryWrite(byte[] data, int offset, int length)
{
if (CanWrite)
if (this.CanWrite)
{
try
{
Write(data, offset, length);
this.Write(data, offset, length);
return true;
}
catch
......@@ -197,7 +197,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
public bool TryWrite(byte[] data)
{
return TryWrite(data, 0, data.Length);
return this.TryWrite(data, 0, data.Length);
}
/// <summary>
......@@ -209,11 +209,11 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
public async Task<bool> TryWriteAsync(byte[] data, int offset, int length)
{
if (CanWrite)
if (this.CanWrite)
{
try
{
await WriteAsync(data, offset, length);
await this.WriteAsync(data, offset, length);
return true;
}
catch
......@@ -230,7 +230,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <returns></returns>
public Task<bool> TryWriteAsync(byte[] data)
{
return TryWriteAsync(data, 0, data.Length);
return this.TryWriteAsync(data, 0, data.Length);
}
/// <summary>
......@@ -247,7 +247,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="data"></param>
public void Write(byte[] data)
{
Write(data, 0, data.Length);
this.Write(data, 0, data.Length);
}
/// <summary>
......@@ -259,9 +259,9 @@ namespace TouchSocket.Rpc.TouchRpc
public Task WriteAsync(byte[] data, int offset, int length)
{
return Task.Run(() =>
{
this.Write(data, offset, length);
});
{
this.Write(data, offset, length);
});
}
/// <summary>
......@@ -270,7 +270,7 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="data"></param>
public void WriteAsync(byte[] data)
{
WriteAsync(data, 0, data.Length);
this.WriteAsync(data, 0, data.Length);
}
}
......@@ -292,21 +292,21 @@ namespace TouchSocket.Rpc.TouchRpc
public InternalChannel(RpcActor client, string targetId)
{
m_actor = client;
m_lastOperationTime = DateTime.Now;
m_targetId = targetId;
m_status = ChannelStatus.Default;
m_cacheCapacity = 1024 * 1024 * 5;
m_dataQueue = new IntelligentDataQueue<ChannelPackage>(m_cacheCapacity)
this.m_actor = client;
this.m_lastOperationTime = DateTime.Now;
this.m_targetId = targetId;
this.m_status = ChannelStatus.Default;
this.m_cacheCapacity = 1024 * 1024 * 5;
this.m_dataQueue = new IntelligentDataQueue<ChannelPackage>(this.m_cacheCapacity)
{
OverflowWait = false,
OnQueueChanged = OnQueueChanged
};
m_moveWaitHandle = new AutoResetEvent(false);
m_canFree = true;
m_timer = new Timer((o) =>
this.m_moveWaitHandle = new AutoResetEvent(false);
this.m_canFree = true;
this.m_timer = new Timer((o) =>
{
if (DateTime.Now - m_lastOperationTime > TimeSpan.FromTicks(Timeout.Ticks * 3))
if (DateTime.Now - this.m_lastOperationTime > TimeSpan.FromTicks(this.Timeout.Ticks * 3))
{
this.SafeDispose();
}
......@@ -318,28 +318,28 @@ namespace TouchSocket.Rpc.TouchRpc
/// </summary>
~InternalChannel()
{
Dispose(false);
this.Dispose(false);
}
/// <summary>
/// 是否具有数据可读
/// </summary>
public override int Available => m_dataQueue.Count;
public override int Available => this.m_dataQueue.Count;
/// <summary>
/// 缓存容量
/// </summary>
public override int CacheCapacity
{
get => m_cacheCapacity;
get => this.m_cacheCapacity;
set
{
if (value < 0)
{
value = 1024;
}
m_cacheCapacity = value;
m_dataQueue.MaxSize = value;
this.m_cacheCapacity = value;
this.m_dataQueue.MaxSize = value;
}
}
......@@ -350,11 +350,11 @@ namespace TouchSocket.Rpc.TouchRpc
{
get
{
if (Available > 0)
if (this.Available > 0)
{
return true;
}
if ((byte)m_status >= 4)
if ((byte)this.m_status >= 4)
{
return false;
}
......@@ -365,35 +365,35 @@ namespace TouchSocket.Rpc.TouchRpc
/// <summary>
/// 能否写入
/// </summary>
public override bool CanWrite => (byte)m_status <= 3;
public override bool CanWrite => (byte)this.m_status <= 3;
/// <summary>
/// ID
/// </summary>
public override int ID => m_id;
public override int ID => this.m_id;
/// <summary>
/// 最后一次操作时显示消息
/// </summary>
public override string LastOperationMes
{
get => m_lastOperationMes;
get => this.m_lastOperationMes;
}
/// <summary>
/// 状态
/// </summary>
public override ChannelStatus Status => m_status;
public override ChannelStatus Status => this.m_status;
/// <summary>
/// 目的ID地址。
/// </summary>
public override string TargetId => m_targetId;
public override string TargetId => this.m_targetId;
/// <summary>
/// 是否被使用
/// </summary>
public override bool Using => m_using;
public override bool Using => this.m_using;
#region 操作
......@@ -402,14 +402,14 @@ namespace TouchSocket.Rpc.TouchRpc
/// </summary>
public override void Cancel(string operationMes = null)
{
if ((byte)m_status > 3)
if ((byte)this.m_status > 3)
{
return;
}
try
{
this.RequestCancel(true);
ChannelPackage channelPackage = new ChannelPackage()
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
RunNow = true,
......@@ -420,7 +420,7 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
catch
{
......@@ -432,13 +432,13 @@ namespace TouchSocket.Rpc.TouchRpc
/// </summary>
public override void Complete(string operationMes = null)
{
if ((byte)m_status > 3)
if ((byte)this.m_status > 3)
{
return;
}
this.RequestComplete(true);
ChannelPackage channelPackage = new ChannelPackage()
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
RunNow = true,
......@@ -449,7 +449,7 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
/// <summary>
......@@ -459,11 +459,11 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="operationMes"></param>
public override void HoldOn(string operationMes = null)
{
if ((byte)m_status > 3)
if ((byte)this.m_status > 3)
{
return;
}
ChannelPackage channelPackage = new ChannelPackage()
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
RunNow = true,
......@@ -474,7 +474,7 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
/// <summary>
......@@ -483,16 +483,20 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
{
if ((byte)m_status > 3)
{
return;
}
m_timer.SafeDispose();
try
{
this.m_timer.SafeDispose();
this.RequestDispose(true);
ChannelPackage channelPackage = new ChannelPackage()
if ((byte)this.m_status > 3)
{
return;
}
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
RunNow = true,
......@@ -502,7 +506,7 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
catch
{
......@@ -530,29 +534,29 @@ namespace TouchSocket.Rpc.TouchRpc
{
return false;
}
if (m_dataQueue.TryDequeue(out ChannelPackage channelPackage))
if (this.m_dataQueue.TryDequeue(out ChannelPackage channelPackage))
{
switch (channelPackage.DataType)
{
case ChannelDataType.DataOrder:
{
m_currentData = channelPackage.Data.Array;
this.m_currentData = channelPackage.Data.Array;
return true;
}
case ChannelDataType.CompleteOrder:
RequestComplete(true);
this.RequestComplete(true);
return false;
case ChannelDataType.CancelOrder:
RequestCancel(true);
this.RequestCancel(true);
return false;
case ChannelDataType.DisposeOrder:
RequestDispose(true);
this.RequestDispose(true);
return false;
case ChannelDataType.HoldOnOrder:
m_status = ChannelStatus.HoldOn;
this.m_status = ChannelStatus.HoldOn;
return false;
case ChannelDataType.QueueRun:
......@@ -568,14 +572,14 @@ namespace TouchSocket.Rpc.TouchRpc
}
}
m_moveWaitHandle.Reset();
if (m_moveWaitHandle.WaitOne(Timeout))
this.m_moveWaitHandle.Reset();
if (this.m_moveWaitHandle.WaitOne(this.Timeout))
{
return MoveNext();
return this.MoveNext();
}
else
{
m_status = ChannelStatus.Overtime;
this.m_status = ChannelStatus.Overtime;
return false;
}
}
......@@ -588,16 +592,16 @@ namespace TouchSocket.Rpc.TouchRpc
/// <param name="length"></param>
public override void Write(byte[] data, int offset, int length)
{
if ((byte)m_status > 3)
if ((byte)this.m_status > 3)
{
throw new Exception($"通道已{m_status}");
throw new Exception($"通道已{this.m_status}");
}
if (!SpinWait.SpinUntil(() => { return m_canFree; }, Timeout))
if (!SpinWait.SpinUntil(() => { return this.m_canFree; }, this.Timeout))
{
throw new TimeoutException();
}
ChannelPackage channelPackage = new ChannelPackage()
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
DataType = ChannelDataType.DataOrder,
......@@ -607,33 +611,33 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
internal void ReceivedData(ChannelPackage channelPackage)
{
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
if (channelPackage.RunNow)
{
switch (channelPackage.DataType)
{
case ChannelDataType.CompleteOrder:
this.m_lastOperationMes = channelPackage.Message;
RequestComplete(false);
this.RequestComplete(false);
break;
case ChannelDataType.CancelOrder:
this.m_lastOperationMes = channelPackage.Message;
RequestCancel(false);
this.RequestCancel(false);
break;
case ChannelDataType.DisposeOrder:
RequestDispose(false);
this.RequestDispose(false);
break;
case ChannelDataType.HoldOnOrder:
this.m_lastOperationMes = channelPackage.Message;
m_status = ChannelStatus.HoldOn;
this.m_status = ChannelStatus.HoldOn;
break;
case ChannelDataType.QueueRun:
......@@ -648,18 +652,18 @@ namespace TouchSocket.Rpc.TouchRpc
return;
}
}
m_dataQueue.Enqueue(channelPackage);
m_moveWaitHandle.Set();
this.m_dataQueue.Enqueue(channelPackage);
this.m_moveWaitHandle.Set();
}
internal void SetID(int id)
{
m_id = id;
this.m_id = id;
}
internal void SetUsing()
{
m_using = true;
this.m_using = true;
}
private void Clear()
......@@ -668,10 +672,12 @@ namespace TouchSocket.Rpc.TouchRpc
{
lock (this)
{
m_moveWaitHandle.Set();
m_moveWaitHandle.SafeDispose();
m_actor.RemoveChannel(m_id);
m_dataQueue.Clear();
this.m_dataQueue.Clear();
if (this.m_actor.RemoveChannel(this.m_id))
{
this.m_moveWaitHandle.Set();
this.m_moveWaitHandle.SafeDispose();
}
}
}
catch
......@@ -681,14 +687,14 @@ namespace TouchSocket.Rpc.TouchRpc
private void OnQueueChanged(bool free)
{
if ((byte)m_status > 3)
if ((byte)this.m_status > 3)
{
return;
}
try
{
ChannelPackage channelPackage = new ChannelPackage()
var channelPackage = new ChannelPackage()
{
ChannelId = this.m_id,
RunNow = true,
......@@ -698,7 +704,7 @@ namespace TouchSocket.Rpc.TouchRpc
Route = this.m_targetId.HasValue()
};
this.m_actor.SendChannelPackage(channelPackage);
m_lastOperationTime = DateTime.Now;
this.m_lastOperationTime = DateTime.Now;
}
catch
{
......@@ -707,33 +713,33 @@ namespace TouchSocket.Rpc.TouchRpc
private void RequestCancel(bool clear)
{
m_status = ChannelStatus.Cancel;
this.m_status = ChannelStatus.Cancel;
if (clear)
{
Clear();
this.Clear();
}
}
private void RequestComplete(bool clear)
{
m_status = ChannelStatus.Completed;
this.m_status = ChannelStatus.Completed;
if (clear)
{
Clear();
this.Clear();
}
}
internal void RequestDispose(bool clear)
{
if ((byte)m_status > 3)
if (clear)
{
return;
this.Clear();
}
m_status = ChannelStatus.Disposed;
if (clear)
if ((byte)this.m_status > 3)
{
Clear();
return;
}
this.m_status = ChannelStatus.Disposed;
}
}
}
\ No newline at end of file
......@@ -168,9 +168,9 @@ namespace TouchSocket.Rpc.TouchRpc
return this.PrivateCreateChannel(targetId, true);
}
internal void RemoveChannel(int id)
internal bool RemoveChannel(int id)
{
m_userChannels.TryRemove(id, out _);
return m_userChannels.TryRemove(id, out _);
}
/// <summary>
......
......@@ -4,7 +4,7 @@
<ApplicationIcon>logo.ico</ApplicationIcon>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>D:\MyStore\13_Doc\Keys\TouchSocket.snk</AssemblyOriginatorKeyFile>
<Version>1.3.0</Version>
<Version>1.3.1</Version>
<LangVersion>8.0</LangVersion>
<Company>若汝棋茗</Company>
<Copyright>Copyright © 2023 若汝棋茗</Copyright>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册