提交 6a46022b 编写于 作者: C CyrusNajmabadi

use a local function.

上级 da9aaf8e
......@@ -68,62 +68,58 @@ internal partial class SQLitePersistentStorage
private async Task FlushSpecificWritesAsync<TKey>(
SqlConnection connection, MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, Task> keyToWriteTask, TKey key,
Dictionary<TKey, Task> keyToWriteTask, TKey key,
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Get the task that is responsible for doing the writes for this queue.
// This task will complete when all previously enqueued writes for this queue
// complete, and all the currently enqueued writes for this queue complete as well.
var writeTask = await GetWriteTask(
connection, keyToWriteActions, keyToWriteTask,
key, writesToProcess, cancellationToken).ConfigureAwait(false);
var writeTask = await GetWriteTask().ConfigureAwait(false);
await writeTask.ConfigureAwait(false);
}
private async Task<Task> GetWriteTask<TKey>(
SqlConnection connection, MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, Task> keyToWriteTask, TKey key,
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Have to acqure the semaphore. We're going to mutate the shared 'keyToWriteActions' and
// 'keyToWriteTask' collections.
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
return;
// Local functions
async Task<Task> GetWriteTask()
{
// Get the writes we need to process.
writesToProcess.AddRange(keyToWriteActions[key]);
// Have to acquire the semaphore. We're going to mutate the shared 'keyToWriteActions'
// and 'keyToWriteTask' collections.
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
{
// Get the writes we need to process.
writesToProcess.AddRange(keyToWriteActions[key]);
// and clear them from the queues so we don't process things multiple times.
keyToWriteActions.Remove(key);
// and clear them from the queues so we don't process things multiple times.
keyToWriteActions.Remove(key);
// Find the existing task responsible for writing to this queue.
var existingWriteTask = keyToWriteTask.TryGetValue(key, out var task)
? task
: SpecializedTasks.EmptyTask;
// Find the existing task responsible for writing to this queue.
var existingWriteTask = keyToWriteTask.TryGetValue(key, out var task)
? task
: SpecializedTasks.EmptyTask;
if (writesToProcess.Count == 0)
{
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return existingWriteTask;
}
if (writesToProcess.Count == 0)
{
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return existingWriteTask;
}
// We have our own writes to process. Enqueue the task to write
// these out after the existing write-task for this queue completes.
var nextTask = existingWriteTask.ContinueWith(
_ => ProcessWriteQueue(connection, writesToProcess),
cancellationToken,
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
// We have our own writes to process. Enqueue the task to write
// these out after the existing write-task for this queue completes.
var nextTask = existingWriteTask.ContinueWith(
_ => ProcessWriteQueue(connection, writesToProcess),
cancellationToken,
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
// Store this for the next flush call to see.
keyToWriteTask[key] = nextTask;
// Store this for the next flush call to see.
keyToWriteTask[key] = nextTask;
// And return this to our caller so it can 'await' all these writes completing.
return nextTask;
// And return this to our caller so it can 'await' all these writes completing.
return nextTask;
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册