NpgsqlModificationCommandBatch.cs 7.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal;

/// <summary>
/// The Npgsql-specific implementation for <see cref="ModificationCommandBatch" />.
/// </summary>
/// <remarks>
/// The usual ModificationCommandBatch implementation is <see cref="AffectedCountModificationCommandBatch"/>,
/// which selects the number of rows modified via a SQL query.
///
/// PostgreSQL actually has no way of selecting the modified row count.
/// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
/// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
/// identifier F121-01.
///
/// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
/// cleaner and more efficient anyway (no additional query).
/// </remarks>
public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
19
{
20
    /// <summary>
21
    /// Constructs an instance of the <see cref="NpgsqlModificationCommandBatch"/> class.
22
    /// </summary>
23 24
    public NpgsqlModificationCommandBatch(
        ModificationCommandBatchFactoryDependencies dependencies,
S
Shay Rojansky 已提交
25
        int maxBatchSize)
26
        : base(dependencies)
S
Shay Rojansky 已提交
27
        => MaxBatchSize = maxBatchSize;
28

S
Shay Rojansky 已提交
29 30 31 32
    /// <summary>
    ///     The maximum number of <see cref="ModificationCommand"/> instances that can be added to a single batch; defaults to 1000.
    /// </summary>
    protected override int MaxBatchSize { get; }
S
Shay Rojansky 已提交
33

34 35 36
    protected override void Consume(RelationalDataReader reader)
    {
        var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
S
Shay Rojansky 已提交
37 38

#pragma warning disable 618
39
        Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
S
Shay Rojansky 已提交
40 41
#pragma warning restore 618

42
        var commandIndex = 0;
43

44 45 46
        try
        {
            while (true)
47
            {
48 49 50 51 52 53
                // Find the next propagating command, if any
                int nextPropagating;
                for (nextPropagating = commandIndex;
                     nextPropagating < ModificationCommands.Count &&
                     !ModificationCommands[nextPropagating].RequiresResultPropagation;
                     nextPropagating++)
54
                {
55
                }
56

57 58 59 60
                // Go over all non-propagating commands before the next propagating one,
                // make sure they executed
                for (; commandIndex < nextPropagating; commandIndex++)
                {
S
Shay Rojansky 已提交
61
#pragma warning disable 618
62
                    if (npgsqlReader.Statements[commandIndex].Rows == 0)
63 64 65
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
66 67
                            ModificationCommands[commandIndex].Entries
                        );
68
                    }
69 70 71 72 73 74 75 76 77 78
#pragma warning restore 618
                }

                if (nextPropagating == ModificationCommands.Count)
                {
                    Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
                    break;
                }

                // Propagate to results from the reader to the ModificationCommand
79

80
                var modificationCommand = ModificationCommands[commandIndex++];
81

82 83 84 85 86
                if (!reader.Read())
                {
                    throw new DbUpdateConcurrencyException(
                        RelationalStrings.UpdateConcurrencyException(1, 0),
                        modificationCommand.Entries);
87
                }
88

89 90 91
                Check.DebugAssert(modificationCommand.RequiresResultPropagation, "RequiresResultPropagation is false");

                modificationCommand.PropagateResults(reader);
92 93

                npgsqlReader.NextResult();
94 95
            }
        }
96
        catch (DbUpdateException)
97
        {
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
            throw;
        }
        catch (Exception ex)
        {
            throw new DbUpdateException(
                RelationalStrings.UpdateStoreException,
                ex,
                ModificationCommands[commandIndex].Entries);
        }
    }

    protected override async Task ConsumeAsync(
        RelationalDataReader reader,
        CancellationToken cancellationToken = default)
    {
        var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
S
Shay Rojansky 已提交
114 115

#pragma warning disable 618
116
        Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
S
Shay Rojansky 已提交
117 118
#pragma warning restore 618

119
        var commandIndex = 0;
120

121 122 123
        try
        {
            while (true)
124
            {
125 126 127 128 129 130
                // Find the next propagating command, if any
                int nextPropagating;
                for (nextPropagating = commandIndex;
                     nextPropagating < ModificationCommands.Count &&
                     !ModificationCommands[nextPropagating].RequiresResultPropagation;
                     nextPropagating++)
131
                {
132
                }
133

134 135 136 137
                // Go over all non-propagating commands before the next propagating one,
                // make sure they executed
                for (; commandIndex < nextPropagating; commandIndex++)
                {
S
Shay Rojansky 已提交
138
#pragma warning disable 618
139
                    if (npgsqlReader.Statements[commandIndex].Rows == 0)
140 141 142
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
143
                            ModificationCommands[commandIndex].Entries
144 145
                        );
                    }
146 147
#pragma warning restore 618
                }
148

149 150 151 152 153 154 155
                if (nextPropagating == ModificationCommands.Count)
                {
                    Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false)), "Expected less resultsets");
                    break;
                }

                // Extract result from the command and propagate it
156

157 158 159 160 161 162 163 164
                var modificationCommand = ModificationCommands[commandIndex++];

                if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
                {
                    throw new DbUpdateConcurrencyException(
                        RelationalStrings.UpdateConcurrencyException(1, 0),
                        modificationCommand.Entries
                    );
165
                }
166

167 168 169
                Check.DebugAssert(modificationCommand.RequiresResultPropagation, "RequiresResultPropagation is false");

                modificationCommand.PropagateResults(reader);
170 171

                await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
172
            }
173 174 175 176 177 178 179 180 181 182 183
        }
        catch (DbUpdateException)
        {
            throw;
        }
        catch (Exception ex)
        {
            throw new DbUpdateException(
                RelationalStrings.UpdateStoreException,
                ex,
                ModificationCommands[commandIndex].Entries);
184 185
        }
    }
186
}