namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal;
///
/// The Npgsql-specific implementation for .
///
///
/// The usual ModificationCommandBatch implementation is ,
/// which selects the number of rows modified via a SQL query.
///
/// PostgreSQL actually has no way of selecting the modified row count.
/// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
/// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
/// identifier F121-01.
///
/// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
/// cleaner and more efficient anyway (no additional query).
///
public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
{
///
/// Constructs an instance of the class.
///
public NpgsqlModificationCommandBatch(
ModificationCommandBatchFactoryDependencies dependencies,
int maxBatchSize)
: base(dependencies)
=> MaxBatchSize = maxBatchSize;
///
/// The maximum number of instances that can be added to a single batch; defaults to 1000.
///
protected override int MaxBatchSize { get; }
///
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
///
protected override void Consume(RelationalDataReader reader)
{
var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
#pragma warning disable 618
Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
#pragma warning restore 618
var commandIndex = 0;
try
{
while (true)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ResultSetMappings[nextPropagating].HasFlag(ResultSetMapping.HasResultRow);
nextPropagating++)
{
}
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
ThrowAggregateUpdateConcurrencyException(reader, commandIndex, 1, 0);
}
#pragma warning restore 618
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
break;
}
// Propagate to results from the reader to the ModificationCommand
var modificationCommand = ModificationCommands[commandIndex];
if (!reader.Read())
{
ThrowAggregateUpdateConcurrencyException(reader, commandIndex, 1, 0);
}
modificationCommand.PropagateResults(reader);
npgsqlReader.NextResult();
commandIndex++;
}
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
{
throw new DbUpdateException(
RelationalStrings.UpdateStoreException,
ex,
ModificationCommands[commandIndex].Entries);
}
}
///
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
///
protected override async Task ConsumeAsync(RelationalDataReader reader, CancellationToken cancellationToken = default)
{
var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
#pragma warning disable 618
Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
#pragma warning restore 618
var commandIndex = 0;
try
{
while (true)
{
// Find the next propagating command, if any
int nextPropagating;
for (nextPropagating = commandIndex;
nextPropagating < ModificationCommands.Count &&
!ResultSetMappings[nextPropagating].HasFlag(ResultSetMapping.HasResultRow);
nextPropagating++)
{
}
// Go over all non-propagating commands before the next propagating one,
// make sure they executed
for (; commandIndex < nextPropagating; commandIndex++)
{
#pragma warning disable 618
if (npgsqlReader.Statements[commandIndex].Rows == 0)
{
await ThrowAggregateUpdateConcurrencyExceptionAsync(reader, commandIndex, 1, 0, cancellationToken)
.ConfigureAwait(false);
}
#pragma warning restore 618
}
if (nextPropagating == ModificationCommands.Count)
{
Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false)), "Expected less resultsets");
break;
}
// Extract result from the command and propagate it
var modificationCommand = ModificationCommands[commandIndex];
if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
{
await ThrowAggregateUpdateConcurrencyExceptionAsync(reader, commandIndex, 1, 0, cancellationToken)
.ConfigureAwait(false);
}
modificationCommand.PropagateResults(reader);
await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
commandIndex++;
}
}
catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException)
{
throw new DbUpdateException(
RelationalStrings.UpdateStoreException,
ex,
ModificationCommands[commandIndex].Entries);
}
}
private IReadOnlyList AggregateEntries(int endIndex, int commandCount)
{
var entries = new List();
for (var i = endIndex - commandCount; i < endIndex; i++)
{
entries.AddRange(ModificationCommands[i].Entries);
}
return entries;
}
///
/// Throws an exception indicating the command affected an unexpected number of rows.
///
/// The data reader.
/// The ordinal of the command.
/// The expected number of rows affected.
/// The actual number of rows affected.
protected virtual void ThrowAggregateUpdateConcurrencyException(
RelationalDataReader reader,
int commandIndex,
int expectedRowsAffected,
int rowsAffected)
{
var entries = AggregateEntries(commandIndex + 1, expectedRowsAffected);
var exception = new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(expectedRowsAffected, rowsAffected),
entries);
if (!Dependencies.UpdateLogger.OptimisticConcurrencyException(
Dependencies.CurrentContext.Context,
entries,
exception,
(c, ex, e, d) => CreateConcurrencyExceptionEventData(c, reader, ex, e, d)).IsSuppressed)
{
throw exception;
}
}
///
/// Throws an exception indicating the command affected an unexpected number of rows.
///
/// The data reader.
/// The ordinal of the command.
/// The expected number of rows affected.
/// The actual number of rows affected.
/// A to observe while waiting for the task to complete.
/// A task that represents the asynchronous operation.
/// If the is canceled.
protected virtual async Task ThrowAggregateUpdateConcurrencyExceptionAsync(
RelationalDataReader reader,
int commandIndex,
int expectedRowsAffected,
int rowsAffected,
CancellationToken cancellationToken)
{
var entries = AggregateEntries(commandIndex + 1, expectedRowsAffected);
var exception = new DbUpdateConcurrencyException(
RelationalStrings.UpdateConcurrencyException(expectedRowsAffected, rowsAffected),
entries);
if (!(await Dependencies.UpdateLogger.OptimisticConcurrencyExceptionAsync(
Dependencies.CurrentContext.Context,
entries,
exception,
(c, ex, e, d) => CreateConcurrencyExceptionEventData(c, reader, ex, e, d),
cancellationToken: cancellationToken)
.ConfigureAwait(false)).IsSuppressed)
{
throw exception;
}
}
private static RelationalConcurrencyExceptionEventData CreateConcurrencyExceptionEventData(
DbContext context,
RelationalDataReader reader,
DbUpdateConcurrencyException exception,
IReadOnlyList entries,
EventDefinition definition)
=> new(
definition,
(definition1, payload)
=> ((EventDefinition)definition1).GenerateMessage(((ConcurrencyExceptionEventData)payload).Exception),
context,
reader.RelationalConnection.DbConnection,
reader.DbCommand,
reader.DbDataReader,
reader.CommandId,
reader.RelationalConnection.ConnectionId,
entries,
exception);
}