提交 9e5d28e4 编写于 作者: M Marc Gravell

Command recycling and disposing during pipelined async multi-exec; enable...

Command recycling and disposing during pipelined async multi-exec; enable pipeline (via sync-over-async) for sync API
上级 10c51f42
...@@ -32,5 +32,5 @@ ...@@ -32,5 +32,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -884,6 +884,13 @@ private static int ExecuteImpl(this IDbConnection cnn, ref CommandDefinition com ...@@ -884,6 +884,13 @@ private static int ExecuteImpl(this IDbConnection cnn, ref CommandDefinition com
CacheInfo info = null; CacheInfo info = null;
if (multiExec != null && !(multiExec is string)) if (multiExec != null && !(multiExec is string))
{ {
#if ASYNC
if((command.Flags & CommandFlags.Pipelined) != 0)
{
// this includes all the code for concurrent/overlapped query
return ExecuteMultiImplAsync(cnn, command, multiExec).Result;
}
#endif
bool isFirst = true; bool isFirst = true;
int total = 0; int total = 0;
bool wasClosed = cnn.State == ConnectionState.Closed; bool wasClosed = cnn.State == ConnectionState.Closed;
......
...@@ -72,6 +72,16 @@ public static Task<int> ExecuteAsync(this IDbConnection cnn, CommandDefinition c ...@@ -72,6 +72,16 @@ public static Task<int> ExecuteAsync(this IDbConnection cnn, CommandDefinition c
} }
} }
private struct AsyncExecState
{
public readonly DbCommand Command;
public readonly Task<int> Task;
public AsyncExecState(DbCommand command, Task<int> task)
{
this.Command = command;
this.Task = task;
}
}
private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandDefinition command, IEnumerable multiExec) private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandDefinition command, IEnumerable multiExec)
{ {
bool isFirst = true; bool isFirst = true;
...@@ -82,30 +92,55 @@ private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandD ...@@ -82,30 +92,55 @@ private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandD
if (wasClosed) await ((DbConnection)cnn).OpenAsync().ConfigureAwait(false); if (wasClosed) await ((DbConnection)cnn).OpenAsync().ConfigureAwait(false);
CacheInfo info = null; CacheInfo info = null;
string masterSql = null;
if ((command.Flags & CommandFlags.Pipelined) != 0) if ((command.Flags & CommandFlags.Pipelined) != 0)
{ {
const int MAX_PENDING = 100; const int MAX_PENDING = 100;
var pending = new Queue<Task<int>>(MAX_PENDING); var pending = new Queue<AsyncExecState>(MAX_PENDING);
DbCommand cmd = null;
try
{
foreach (var obj in multiExec) foreach (var obj in multiExec)
{ {
var cmd = (DbCommand)command.SetupCommand(cnn, null);
if (isFirst) if (isFirst)
{ {
isFirst = false; isFirst = false;
cmd = (DbCommand)command.SetupCommand(cnn, null);
masterSql = cmd.CommandText;
var identity = new Identity(command.CommandText, cmd.CommandType, cnn, null, obj.GetType(), null); var identity = new Identity(command.CommandText, cmd.CommandType, cnn, null, obj.GetType(), null);
info = GetCacheInfo(identity, obj); info = GetCacheInfo(identity, obj);
} else if(pending.Count >= MAX_PENDING)
{
var recycled = pending.Dequeue();
total += await recycled.Task.ConfigureAwait(false);
cmd = recycled.Command;
cmd.CommandText = masterSql; // because we do magic replaces on "in" etc
cmd.Parameters.Clear(); // current code is Add-tastic
}
else
{
cmd = (DbCommand)command.SetupCommand(cnn, null);
} }
info.ParamReader(cmd, obj); info.ParamReader(cmd, obj);
var task = cmd.ExecuteNonQueryAsync(command.CancellationToken); var task = cmd.ExecuteNonQueryAsync(command.CancellationToken);
pending.Enqueue(task); pending.Enqueue(new AsyncExecState(cmd, task));
if(pending.Count == MAX_PENDING) cmd = null; // note the using in the finally: this avoids a double-dispose
{
total += await pending.Dequeue().ConfigureAwait(false);
} }
while (pending.Count != 0)
{
var pair = pending.Dequeue();
using (pair.Command) { } // dispose commands
total += await pair.Task.ConfigureAwait(false);
} }
while(pending.Count != 0) } finally
{ {
total += await pending.Dequeue().ConfigureAwait(false); // this only has interesting work to do if there are failures
using (cmd) { } // dispose commands
while (pending.Count != 0)
{ // dispose tasks even in failure
using (pending.Dequeue().Command) { } // dispose commands
}
} }
return total; return total;
} }
...@@ -113,7 +148,6 @@ private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandD ...@@ -113,7 +148,6 @@ private static async Task<int> ExecuteMultiImplAsync(IDbConnection cnn, CommandD
{ {
using (var cmd = (DbCommand)command.SetupCommand(cnn, null)) using (var cmd = (DbCommand)command.SetupCommand(cnn, null))
{ {
string masterSql = null;
foreach (var obj in multiExec) foreach (var obj in multiExec)
{ {
if (isFirst) if (isFirst)
......
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -209,10 +209,10 @@ public void LiteralIn() ...@@ -209,10 +209,10 @@ public void LiteralIn()
} }
public void RunSequentialVersusParallel() public void RunSequentialVersusParallelAsync()
{ {
var ids = Enumerable.Range(1, 2000).Select(id => new { id }).ToArray(); var ids = Enumerable.Range(1, 20000).Select(id => new { id }).ToArray();
using (var connection = Program.GetOpenConnection(true)) using (var connection = Program.GetOpenConnection(true))
{ {
connection.ExecuteAsync(new CommandDefinition("select @id", ids.Take(5), flags: CommandFlags.None)).Wait(); connection.ExecuteAsync(new CommandDefinition("select @id", ids.Take(5), flags: CommandFlags.None)).Wait();
...@@ -228,6 +228,26 @@ public void RunSequentialVersusParallel() ...@@ -228,6 +228,26 @@ public void RunSequentialVersusParallel()
System.Console.WriteLine("Pipeline: {0}ms", watch.ElapsedMilliseconds); System.Console.WriteLine("Pipeline: {0}ms", watch.ElapsedMilliseconds);
} }
} }
public void RunSequentialVersusParallelSync()
{
var ids = Enumerable.Range(1, 20000).Select(id => new { id }).ToArray();
using (var connection = Program.GetOpenConnection(true))
{
connection.Execute(new CommandDefinition("select @id", ids.Take(5), flags: CommandFlags.None));
var watch = Stopwatch.StartNew();
connection.Execute(new CommandDefinition("select @id", ids, flags: CommandFlags.None));
watch.Stop();
System.Console.WriteLine("No pipeline: {0}ms", watch.ElapsedMilliseconds);
watch = Stopwatch.StartNew();
connection.Execute(new CommandDefinition("select @id", ids, flags: CommandFlags.Pipelined));
watch.Stop();
System.Console.WriteLine("Pipeline: {0}ms", watch.ElapsedMilliseconds);
}
}
class Product class Product
{ {
public int Id { get; set; } public int Id { get; set; }
......
...@@ -31,5 +31,5 @@ ...@@ -31,5 +31,5 @@
// //
// You can specify all the values or you can default the Build and Revision Numbers // You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below: // by using the '*' as shown below:
[assembly: AssemblyVersion("1.24.0.0")] [assembly: AssemblyVersion("1.25.0.0")]
[assembly: AssemblyFileVersion("1.24.0.0")] [assembly: AssemblyFileVersion("1.25.0.0")]
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd"> <package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata schemaVersion="2"> <metadata schemaVersion="2">
<id>Dapper</id> <id>Dapper</id>
<version>1.24</version> <version>1.25</version>
<title>Dapper dot net</title> <title>Dapper dot net</title>
<authors>Sam Saffron,Marc Gravell</authors> <authors>Sam Saffron,Marc Gravell</authors>
<owners>Sam Saffron,Marc Gravell</owners> <owners>Sam Saffron,Marc Gravell</owners>
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
<frameworkAssembly assemblyName="Microsoft.CSharp" targetFramework=".NETFramework4.0-Client, .NETFramework4.0" /> <frameworkAssembly assemblyName="Microsoft.CSharp" targetFramework=".NETFramework4.0-Client, .NETFramework4.0" />
</frameworkAssemblies> </frameworkAssemblies>
<releaseNotes> <releaseNotes>
* 1.25 - Command recycling and disposing during pipelined async multi-exec; enable pipeline (via sync-over-async) for sync API
* 1.24 - Implement pipelined async multi-exec, when flag is specified (only - requires MARS etc) * 1.24 - Implement pipelined async multi-exec, when flag is specified (only - requires MARS etc)
* 1.23 - Improved support for optimize hints (@foo unknown) with list expansions * 1.23 - Improved support for optimize hints (@foo unknown) with list expansions
* 1.22 - Literal support now extends to enumerable types (for "in" etc usage); move to command-flags model for "buffered" etc * 1.22 - Literal support now extends to enumerable types (for "in" etc usage); move to command-flags model for "buffered" etc
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册