未验证 提交 b76fac9e 编写于 作者: S Sam Harwell 提交者: GitHub

Merge pull request #36129 from sharwell/writes-no-capture

Avoid captures in SqlConnection.FlushSpecificWritesAsync
......@@ -67,6 +67,7 @@ internal partial class SQLitePersistentStorage
}
}
[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/36114", AllowCaptures = false)]
private async Task FlushSpecificWritesAsync<TKey>(
MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, Task> keyToWriteTask,
......@@ -77,7 +78,7 @@ internal partial class SQLitePersistentStorage
// Get's the task representing the current writes being performed by another
// thread for this queue+key, and a TaskCompletionSource we can use to let
// other threads know about our own progress writing any new writes in this queue.
var (previousWritesTask, taskCompletionSource) = await GetWriteTaskAsync().ConfigureAwait(false);
var (previousWritesTask, taskCompletionSource) = await GetWriteTaskAsync(keyToWriteActions, keyToWriteTask, key, writesToProcess, cancellationToken).ConfigureAwait(false);
try
{
// Wait for all previous writes to be flushed.
......@@ -116,55 +117,57 @@ internal partial class SQLitePersistentStorage
// to proceed.
taskCompletionSource?.TrySetResult(0);
}
}
return;
// Local functions
//[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/36114", OftenCompletesSynchronously = true)]
async ValueTask<(Task previousTask, TaskCompletionSource<int> taskCompletionSource)> GetWriteTaskAsync()
[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/36114", OftenCompletesSynchronously = true)]
private async ValueTask<(Task previousTask, TaskCompletionSource<int> taskCompletionSource)> GetWriteTaskAsync<TKey>(
MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, Task> keyToWriteTask,
TKey key,
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Have to acquire the semaphore. We're going to mutate the shared 'keyToWriteActions'
// and 'keyToWriteTask' collections.
//
// Note: by blocking on _writeQueueGate we are guaranteed to see all the writes
// performed by FlushAllPendingWritesAsync.
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
{
// Have to acquire the semaphore. We're going to mutate the shared 'keyToWriteActions'
// and 'keyToWriteTask' collections.
//
// Note: by blocking on _writeQueueGate we are guaranteed to see all the writes
// performed by FlushAllPendingWritesAsync.
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
// Get the writes we need to process.
// Note: explicitly foreach so we operate on the struct enumerator for
// MultiDictionary.ValueSet.
var actions = keyToWriteActions[key];
writesToProcess.EnsureCapacity(writesToProcess.Count + actions.Count);
foreach (var action in actions)
{
// Get the writes we need to process.
// Note: explicitly foreach so we operate on the struct enumerator for
// MultiDictionary.ValueSet.
var actions = keyToWriteActions[key];
writesToProcess.EnsureCapacity(writesToProcess.Count + actions.Count);
foreach (var action in actions)
{
writesToProcess.Add(action);
}
writesToProcess.Add(action);
}
// and clear them from the queues so we don't process things multiple times.
keyToWriteActions.Remove(key);
// and clear them from the queues so we don't process things multiple times.
keyToWriteActions.Remove(key);
// Find the existing task responsible for writing to this queue.
var existingWriteTask = keyToWriteTask.TryGetValue(key, out var task)
? task
: Task.CompletedTask;
// Find the existing task responsible for writing to this queue.
var existingWriteTask = keyToWriteTask.TryGetValue(key, out var task)
? task
: Task.CompletedTask;
if (writesToProcess.Count == 0)
{
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return (previousTask: existingWriteTask, taskCompletionSource: null);
}
if (writesToProcess.Count == 0)
{
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return (previousTask: existingWriteTask, taskCompletionSource: null);
}
// Create a TCS that represents our own work writing out "writesToProcess".
// Store it in keyToWriteTask so that if other threads come along, they'll
// wait for us to complete before doing their own reads/writes on this queue.
var localCompletionSource = new TaskCompletionSource<int>();
// Create a TCS that represents our own work writing out "writesToProcess".
// Store it in keyToWriteTask so that if other threads come along, they'll
// wait for us to complete before doing their own reads/writes on this queue.
var localCompletionSource = new TaskCompletionSource<int>();
keyToWriteTask[key] = localCompletionSource.Task;
keyToWriteTask[key] = localCompletionSource.Task;
return (previousTask: existingWriteTask, taskCompletionSource: localCompletionSource);
}
return (previousTask: existingWriteTask, taskCompletionSource: localCompletionSource);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册