提交 da9aaf8e 编写于 作者: C CyrusNajmabadi

Simplify how we batch write to our persistence service.

上级 13a3bb52
......@@ -30,11 +30,12 @@ private abstract class Accessor<TKey, TWriteQueueKey, TDatabaseId>
new MultiDictionary<TWriteQueueKey, Action<SqlConnection>>();
/// <summary>
/// Keep track of how many threads are trying to write out this particular queue. All threads
/// trying to write out the queue will wait until all the writes are done.
/// The task responsible for writing out all the batched actions we have for a particular
/// queue. When new reads come in for that queue they can 'await' this write-task completing
/// so that all reads for the queue observe any previously completed writes.
/// </summary>
private readonly Dictionary<TWriteQueueKey, CountdownEvent> _writeQueueKeyToCountdown =
new Dictionary<TWriteQueueKey, CountdownEvent>();
private readonly Dictionary<TWriteQueueKey, Task> _writeQueueKeyToWriteTask =
new Dictionary<TWriteQueueKey, Task>();
public Accessor(SQLitePersistentStorage storage)
{
......@@ -119,7 +120,7 @@ public async Task<Stream> ReadStreamAsync(TKey key, CancellationToken cancellati
private Task FlushPendingWritesAsync(SqlConnection connection, TKey key, CancellationToken cancellationToken)
=> Storage.FlushSpecificWritesAsync(
connection, _writeQueueKeyToWrites, _writeQueueKeyToCountdown, GetWriteQueueKey(key), cancellationToken);
connection, _writeQueueKeyToWrites, _writeQueueKeyToWriteTask, GetWriteQueueKey(key), cancellationToken);
private Task AddWriteTaskAsync(TKey key, Action<SqlConnection> action, CancellationToken cancellationToken)
=> Storage.AddWriteTaskAsync(_writeQueueKeyToWrites, GetWriteQueueKey(key), action, cancellationToken);
......
......@@ -2,7 +2,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.SQLite.Interop;
......@@ -52,14 +51,14 @@ internal partial class SQLitePersistentStorage
private async Task FlushSpecificWritesAsync<TKey>(
SqlConnection connection,
MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, CountdownEvent> keyToCountdown,
Dictionary<TKey, Task> keyToWriteTask,
TKey key, CancellationToken cancellationToken)
{
var writesToProcess = ArrayBuilder<Action<SqlConnection>>.GetInstance();
try
{
await FlushSpecificWritesAsync(
connection, keyToWriteActions, keyToCountdown, key, writesToProcess, cancellationToken).ConfigureAwait(false);
connection, keyToWriteActions, keyToWriteTask, key, writesToProcess, cancellationToken).ConfigureAwait(false);
}
finally
{
......@@ -69,19 +68,28 @@ internal partial class SQLitePersistentStorage
private async Task FlushSpecificWritesAsync<TKey>(
SqlConnection connection, MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, CountdownEvent> keyToCountdown, TKey key,
Dictionary<TKey, Task> keyToWriteTask, TKey key,
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Many threads many be trying to flush a specific queue. If some other thread
// beats us to writing this queue, we want to still wait until it is down. To
// accomplish that, we use a countdown that effectively states how many current
// writers there are, and which only lets us past once all the concurrent writers
// say they are done.
CountdownEvent countdown;
// Note: by blocking on _writeQueueGate we are guaranteed to see all the writes
// performed by FlushAllPendingWrites.
// Get the task that is responsible for doing the writes for this queue.
// This task will complete when all previously enqueued writes for this queue
// complete, and all the currently enqueued writes for this queue complete as well.
var writeTask = await GetWriteTask(
connection, keyToWriteActions, keyToWriteTask,
key, writesToProcess, cancellationToken).ConfigureAwait(false);
await writeTask.ConfigureAwait(false);
}
private async Task<Task> GetWriteTask<TKey>(
SqlConnection connection, MultiDictionary<TKey, Action<SqlConnection>> keyToWriteActions,
Dictionary<TKey, Task> keyToWriteTask, TKey key,
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Have to acqure the semaphore. We're going to mutate the shared 'keyToWriteActions' and
// 'keyToWriteTask' collections.
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
{
// Get the writes we need to process.
......@@ -90,76 +98,32 @@ internal partial class SQLitePersistentStorage
// and clear them from the queues so we don't process things multiple times.
keyToWriteActions.Remove(key);
// We may have acquired _writeQueueGate between the time that an existing thread
// completes the "Wait" below and grabs this lock. If that's the case, let go
// of the countdown associated with this key as it is no longer usable.
RemoveCountdownIfComplete(keyToCountdown, key);
// Find the existing task responsible for writing to this queue.
var existingWriteTask = keyToWriteTask.TryGetValue(key, out var task)
? task
: SpecializedTasks.EmptyTask;
// See if there's an existing countdown keeping track of the number of writers
// writing this queue.
if (!keyToCountdown.TryGetValue(key, out countdown))
if (writesToProcess.Count == 0)
{
// We're the first writer for this queue. Set the count to one, and keep
// it around so future concurrent writers will see it.
countdown = new CountdownEvent(initialCount: 1);
keyToCountdown.Add(key, countdown);
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return existingWriteTask;
}
else
{
// If there is, increment the count to indicate that we're writing as well.
countdown.AddCount();
}
Debug.Assert(countdown.CurrentCount >= 1);
}
// Now actually process any writes we found for this queue.
ProcessWriteQueue(connection, writesToProcess);
// Mark that we're done writing out this queue, and wait until all other writers
// for this queue are done. Note: this needs to happen in the lock so that
// changes to the countdown value are observed consistently across all threads.
bool lastSignal;
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
{
lastSignal = countdown.Signal();
}
// Don't proceed until all concurrent writers of this queue complete.
countdown.Wait();
// If we're the thread that finally got the countdown to zero, then dispose of this
// count down and remove it from the dictionary (if it hasn't already been replaced
// by the next request).
if (lastSignal)
{
Debug.Assert(countdown.CurrentCount == 0);
// We have our own writes to process. Enqueue the task to write
// these out after the existing write-task for this queue completes.
var nextTask = existingWriteTask.ContinueWith(
_ => ProcessWriteQueue(connection, writesToProcess),
cancellationToken,
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
// Safe to call outside of lock. Countdown is only given out to a set of threads
// that have incremented it. And we can only get here once all the threads have
// been allowed to get past the 'Wait' point. Only one of those threads will
// have lastSignal set to true, so we'll only dispose this once.
countdown.Dispose();
// Store this for the next flush call to see.
keyToWriteTask[key] = nextTask;
using (await _writeQueueGate.DisposableWaitAsync(cancellationToken).ConfigureAwait(false))
{
// Remove the countdown if it's still in the dictionary. It may not be if
// another thread came in after this batch of threads completed, and it
// removed the completed countdown already.
RemoveCountdownIfComplete(keyToCountdown, key);
}
}
}
private void RemoveCountdownIfComplete<TKey>(
Dictionary<TKey, CountdownEvent> keyToCountdown, TKey key)
{
Debug.Assert(_writeQueueGate.CurrentCount == 0);
if (keyToCountdown.TryGetValue(key, out var tempCountDown) &&
tempCountDown.CurrentCount == 0)
{
keyToCountdown.Remove(key);
// And return this to our caller so it can 'await' all these writes completing.
return nextTask;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册