From 9e5d28e42ac04b4c52f37d17471a90c7ffc673d7 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Wed, 14 May 2014 17:29:40 +0100 Subject: [PATCH] Command recycling and disposing during pipelined async multi-exec; enable pipeline (via sync-over-async) for sync API --- Dapper NET40/Properties/AssemblyInfo.cs | 4 +- Dapper NET40/SqlMapper.cs | 7 ++ Dapper NET45/SqlMapperAsync.cs | 66 ++++++++++++++----- .../Properties/AssemblyInfo.cs | 4 +- Dapper.Contrib/Properties/AssemblyInfo.cs | 4 +- Dapper.SqlBuilder/Properties/AssemblyInfo.cs | 4 +- DapperTests NET35/Properties/AssemblyInfo.cs | 4 +- DapperTests NET45/Properties/AssemblyInfo.cs | 4 +- DapperTests NET45/Tests.cs | 24 ++++++- Tests/Properties/AssemblyInfo.cs | 4 +- dapper.nuspec | 3 +- 11 files changed, 95 insertions(+), 33 deletions(-) diff --git a/Dapper NET40/Properties/AssemblyInfo.cs b/Dapper NET40/Properties/AssemblyInfo.cs index 4eb8e93..cab8613 100644 --- a/Dapper NET40/Properties/AssemblyInfo.cs +++ b/Dapper NET40/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/Dapper NET40/SqlMapper.cs b/Dapper NET40/SqlMapper.cs index 9f44016..a7cc3d2 100644 --- a/Dapper NET40/SqlMapper.cs +++ b/Dapper NET40/SqlMapper.cs @@ -884,6 +884,13 @@ private static int ExecuteImpl(this IDbConnection cnn, ref CommandDefinition com CacheInfo info = null; if (multiExec != null && !(multiExec is string)) { +#if ASYNC + if((command.Flags & CommandFlags.Pipelined) != 0) + { + // this includes all the code for concurrent/overlapped query + return ExecuteMultiImplAsync(cnn, command, multiExec).Result; + } +#endif bool isFirst = true; int total = 0; bool wasClosed = cnn.State == ConnectionState.Closed; diff --git a/Dapper NET45/SqlMapperAsync.cs b/Dapper NET45/SqlMapperAsync.cs index 40e33f5..7641485 100644 --- a/Dapper NET45/SqlMapperAsync.cs +++ b/Dapper NET45/SqlMapperAsync.cs @@ -72,6 +72,16 @@ public static Task ExecuteAsync(this IDbConnection cnn, CommandDefinition c } } + private struct AsyncExecState + { + public readonly DbCommand Command; + public readonly Task Task; + public AsyncExecState(DbCommand command, Task task) + { + this.Command = command; + this.Task = task; + } + } private static async Task ExecuteMultiImplAsync(IDbConnection cnn, CommandDefinition command, IEnumerable multiExec) { bool isFirst = true; @@ -82,30 +92,55 @@ private static async Task ExecuteMultiImplAsync(IDbConnection cnn, CommandD if (wasClosed) await ((DbConnection)cnn).OpenAsync().ConfigureAwait(false); CacheInfo info = null; + string masterSql = null; if ((command.Flags & CommandFlags.Pipelined) != 0) { const int MAX_PENDING = 100; - var pending = new Queue>(MAX_PENDING); - foreach (var obj in multiExec) + var pending = new Queue(MAX_PENDING); + DbCommand cmd = null; + try { - var cmd = (DbCommand)command.SetupCommand(cnn, null); - if (isFirst) + foreach (var obj in multiExec) { - isFirst = false; - var identity = new Identity(command.CommandText, cmd.CommandType, cnn, null, obj.GetType(), null); - info = GetCacheInfo(identity, obj); + if (isFirst) + { + isFirst = false; + cmd = (DbCommand)command.SetupCommand(cnn, null); + masterSql = cmd.CommandText; + var identity = new Identity(command.CommandText, cmd.CommandType, cnn, null, obj.GetType(), null); + info = GetCacheInfo(identity, obj); + } else if(pending.Count >= MAX_PENDING) + { + var recycled = pending.Dequeue(); + total += await recycled.Task.ConfigureAwait(false); + cmd = recycled.Command; + cmd.CommandText = masterSql; // because we do magic replaces on "in" etc + cmd.Parameters.Clear(); // current code is Add-tastic + } + else + { + cmd = (DbCommand)command.SetupCommand(cnn, null); + } + info.ParamReader(cmd, obj); + + var task = cmd.ExecuteNonQueryAsync(command.CancellationToken); + pending.Enqueue(new AsyncExecState(cmd, task)); + cmd = null; // note the using in the finally: this avoids a double-dispose } - info.ParamReader(cmd, obj); - var task = cmd.ExecuteNonQueryAsync(command.CancellationToken); - pending.Enqueue(task); - if(pending.Count == MAX_PENDING) + while (pending.Count != 0) { - total += await pending.Dequeue().ConfigureAwait(false); + var pair = pending.Dequeue(); + using (pair.Command) { } // dispose commands + total += await pair.Task.ConfigureAwait(false); } - } - while(pending.Count != 0) + } finally { - total += await pending.Dequeue().ConfigureAwait(false); + // this only has interesting work to do if there are failures + using (cmd) { } // dispose commands + while (pending.Count != 0) + { // dispose tasks even in failure + using (pending.Dequeue().Command) { } // dispose commands + } } return total; } @@ -113,7 +148,6 @@ private static async Task ExecuteMultiImplAsync(IDbConnection cnn, CommandD { using (var cmd = (DbCommand)command.SetupCommand(cnn, null)) { - string masterSql = null; foreach (var obj in multiExec) { if (isFirst) diff --git a/Dapper.Contrib.Tests/Properties/AssemblyInfo.cs b/Dapper.Contrib.Tests/Properties/AssemblyInfo.cs index 8581359..81bb279 100644 --- a/Dapper.Contrib.Tests/Properties/AssemblyInfo.cs +++ b/Dapper.Contrib.Tests/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/Dapper.Contrib/Properties/AssemblyInfo.cs b/Dapper.Contrib/Properties/AssemblyInfo.cs index 01fc3be..6e8036d 100644 --- a/Dapper.Contrib/Properties/AssemblyInfo.cs +++ b/Dapper.Contrib/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/Dapper.SqlBuilder/Properties/AssemblyInfo.cs b/Dapper.SqlBuilder/Properties/AssemblyInfo.cs index ddfe0ec..a1d2af6 100644 --- a/Dapper.SqlBuilder/Properties/AssemblyInfo.cs +++ b/Dapper.SqlBuilder/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/DapperTests NET35/Properties/AssemblyInfo.cs b/DapperTests NET35/Properties/AssemblyInfo.cs index ef0e604..64280d1 100644 --- a/DapperTests NET35/Properties/AssemblyInfo.cs +++ b/DapperTests NET35/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/DapperTests NET45/Properties/AssemblyInfo.cs b/DapperTests NET45/Properties/AssemblyInfo.cs index 01f440e..0748e0e 100644 --- a/DapperTests NET45/Properties/AssemblyInfo.cs +++ b/DapperTests NET45/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/DapperTests NET45/Tests.cs b/DapperTests NET45/Tests.cs index 9fe2039..526e883 100644 --- a/DapperTests NET45/Tests.cs +++ b/DapperTests NET45/Tests.cs @@ -209,10 +209,10 @@ public void LiteralIn() } - public void RunSequentialVersusParallel() + public void RunSequentialVersusParallelAsync() { - var ids = Enumerable.Range(1, 2000).Select(id => new { id }).ToArray(); + var ids = Enumerable.Range(1, 20000).Select(id => new { id }).ToArray(); using (var connection = Program.GetOpenConnection(true)) { connection.ExecuteAsync(new CommandDefinition("select @id", ids.Take(5), flags: CommandFlags.None)).Wait(); @@ -228,6 +228,26 @@ public void RunSequentialVersusParallel() System.Console.WriteLine("Pipeline: {0}ms", watch.ElapsedMilliseconds); } } + + public void RunSequentialVersusParallelSync() + { + + var ids = Enumerable.Range(1, 20000).Select(id => new { id }).ToArray(); + using (var connection = Program.GetOpenConnection(true)) + { + connection.Execute(new CommandDefinition("select @id", ids.Take(5), flags: CommandFlags.None)); + + var watch = Stopwatch.StartNew(); + connection.Execute(new CommandDefinition("select @id", ids, flags: CommandFlags.None)); + watch.Stop(); + System.Console.WriteLine("No pipeline: {0}ms", watch.ElapsedMilliseconds); + + watch = Stopwatch.StartNew(); + connection.Execute(new CommandDefinition("select @id", ids, flags: CommandFlags.Pipelined)); + watch.Stop(); + System.Console.WriteLine("Pipeline: {0}ms", watch.ElapsedMilliseconds); + } + } class Product { public int Id { get; set; } diff --git a/Tests/Properties/AssemblyInfo.cs b/Tests/Properties/AssemblyInfo.cs index 35d8379..5be0710 100644 --- a/Tests/Properties/AssemblyInfo.cs +++ b/Tests/Properties/AssemblyInfo.cs @@ -31,5 +31,5 @@ // // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: -[assembly: AssemblyVersion("1.24.0.0")] -[assembly: AssemblyFileVersion("1.24.0.0")] +[assembly: AssemblyVersion("1.25.0.0")] +[assembly: AssemblyFileVersion("1.25.0.0")] diff --git a/dapper.nuspec b/dapper.nuspec index c00f2a7..1473959 100644 --- a/dapper.nuspec +++ b/dapper.nuspec @@ -2,7 +2,7 @@ Dapper - 1.24 + 1.25 Dapper dot net Sam Saffron,Marc Gravell Sam Saffron,Marc Gravell @@ -19,6 +19,7 @@ + * 1.25 - Command recycling and disposing during pipelined async multi-exec; enable pipeline (via sync-over-async) for sync API * 1.24 - Implement pipelined async multi-exec, when flag is specified (only - requires MARS etc) * 1.23 - Improved support for optimize hints (@foo unknown) with list expansions * 1.22 - Literal support now extends to enumerable types (for "in" etc usage); move to command-flags model for "buffered" etc -- GitLab