提交 153db6e5 编写于 作者: S Shay Rojansky

Simplify update pipeline implementation

And adapt to the new upstream (no more RequiresResultPropagation)
上级 147fb679
......@@ -49,49 +49,40 @@ protected override void Consume(RelationalDataReader reader)
try
{
while (true)
bool? onResultSet = null;
for (; commandIndex < ModificationCommands.Count; commandIndex++)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ResultSetMappings[nextPropagating].HasFlag(ResultSetMapping.HasResultRow);
nextPropagating++)
// Note that in the PG provider, we never transmit rows affected via the result set - it's always transmitted separately via
// the PG wire protocol and exposed on the reader (see below).
// As a result, if there's a result set we know that it contains values to be propagated back into the entity instance.
if (ResultSetMappings[commandIndex].HasFlag(ResultSetMapping.HasResultRow))
{
}
var modificationCommand = ModificationCommands[commandIndex];
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
if (!reader.Read())
{
ThrowAggregateUpdateConcurrencyException(reader, commandIndex, 1, 0);
}
#pragma warning restore 618
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
break;
}
// Propagate to results from the reader to the ModificationCommand
modificationCommand.PropagateResults(reader);
var modificationCommand = ModificationCommands[commandIndex];
onResultSet = npgsqlReader.NextResult();
}
if (!reader.Read())
// TODO: when EF Core adds support for DbBatch (https://github.com/dotnet/efcore/issues/18990), we can start using that
// standardized API for fetching the rows affected by an individual command in a batch.
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
ThrowAggregateUpdateConcurrencyException(reader, commandIndex, 1, 0);
}
#pragma warning restore 618
}
modificationCommand.PropagateResults(reader);
npgsqlReader.NextResult();
commandIndex++;
if (onResultSet == true)
{
Dependencies.UpdateLogger.UnexpectedTrailingResultSetWhenSaving();
}
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
......@@ -121,51 +112,42 @@ protected override async Task ConsumeAsync(RelationalDataReader reader, Cancella
try
{
while (true)
bool? onResultSet = null;
for (; commandIndex < ModificationCommands.Count; commandIndex++)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ResultSetMappings[nextPropagating].HasFlag(ResultSetMapping.HasResultRow);
nextPropagating++)
// Note that in the PG provider, we never transmit rows affected via the result set - it's transmitted via the PG wire
// protocol and exposed on the reader (see above).
// As a result, if there's a result set we know that it contains values to be propagated back into the entity instance.
if (ResultSetMappings[commandIndex].HasFlag(ResultSetMapping.HasResultRow))
{
}
var modificationCommand = ModificationCommands[commandIndex];
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
{
await ThrowAggregateUpdateConcurrencyExceptionAsync(reader, commandIndex, 1, 0, cancellationToken)
.ConfigureAwait(false);
}
#pragma warning restore 618
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false)), "Expected less resultsets");
break;
}
// Extract result from the command and propagate it
modificationCommand.PropagateResults(reader);
var modificationCommand = ModificationCommands[commandIndex];
onResultSet = await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
}
if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
// TODO: when EF Core adds support for DbBatch (https://github.com/dotnet/efcore/issues/18990), we can start using that
// standardized API for fetching the rows affected by an individual command in a batch.
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
await ThrowAggregateUpdateConcurrencyExceptionAsync(reader, commandIndex, 1, 0, cancellationToken)
.ConfigureAwait(false);
}
#pragma warning restore 618
}
modificationCommand.PropagateResults(reader);
await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
commandIndex++;
if (onResultSet == true)
{
Dependencies.UpdateLogger.UnexpectedTrailingResultSetWhenSaving();
}
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
......
......@@ -114,7 +114,7 @@ public NpgsqlUpdateSqlGenerator(UpdateSqlGeneratorDependencies dependencies)
AppendUpdateCommand(commandStringBuilder, name, schema, writeOperations, readOperations, conditionOperations);
return ResultSetMapping.LastInResultSet;
return readOperations.Count > 0 ? ResultSetMapping.LastInResultSet : ResultSetMapping.NoResults;
}
/// <summary>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册