NpgsqlModificationCommandBatch.cs 8.8 KB
Newer Older
1
using System;
2 3 4
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
5
using Microsoft.EntityFrameworkCore;
6
using Microsoft.EntityFrameworkCore.Diagnostics;
7 8
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Storage;
9
using Microsoft.EntityFrameworkCore.Update;
10

11
namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal
12
{
13 14 15
    /// <summary>
    /// The Npgsql-specific implementation for <see cref="ModificationCommandBatch" />.
    /// </summary>
16 17
    /// <remarks>
    /// The usual ModificationCommandBatch implementation is <see cref="AffectedCountModificationCommandBatch"/>,
S
Shay Rojansky 已提交
18
    /// which selects the number of rows modified via a SQL query.
19 20 21 22 23 24 25 26 27 28 29
    ///
    /// PostgreSQL actually has no way of selecting the modified row count.
    /// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
    /// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
    /// identifier F121-01.
    ///
    /// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
    /// cleaner and more efficient anyway (no additional query).
    /// </remarks>
    public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
    {
30 31 32
        private const int DefaultBatchSize = 1000;
        private readonly int _maxBatchSize;
        private int _parameterCount;
S
Shay Rojansky 已提交
33

34 35 36 37 38 39 40 41
        /// <summary>
        /// Constructs an instance of the <see cref="NpgsqlModificationCommandBatch"/> class.
        /// </summary>
        /// <param name="commandBuilderFactory">The builder to build commands.</param>
        /// <param name="sqlGenerationHelper">A helper for SQL generation.</param>
        /// <param name="updateSqlGenerator">A SQL generator for insert, update, and delete commands.</param>
        /// <param name="valueBufferFactoryFactory">A factory for creating <see cref="ValueBuffer" /> factories.</param>
        /// <param name="maxBatchSize">The maximum count of commands to batch.</param>
42
        public NpgsqlModificationCommandBatch(
43
            ModificationCommandBatchFactoryDependencies dependencies,
S
Shay Rojansky 已提交
44
            int? maxBatchSize)
45
            : base(dependencies)
46
        {
S
Shay Rojansky 已提交
47
            if (maxBatchSize.HasValue && maxBatchSize.Value <= 0)
48
                throw new ArgumentOutOfRangeException(nameof(maxBatchSize), RelationalStrings.InvalidMaxBatchSize(maxBatchSize));
S
Shay Rojansky 已提交
49 50

            _maxBatchSize = maxBatchSize ?? DefaultBatchSize;
51 52
        }

53
        protected override int GetParameterCount() => _parameterCount;
S
Shay Rojansky 已提交
54

S
Shay Rojansky 已提交
55
        protected override bool CanAddCommand(IReadOnlyModificationCommand modificationCommand)
S
Shay Rojansky 已提交
56 57 58 59
        {
            if (ModificationCommands.Count >= _maxBatchSize)
                return false;

S
Shay Rojansky 已提交
60
            var newParamCount = (long)_parameterCount + modificationCommand.ColumnModifications.Count;
S
Shay Rojansky 已提交
61 62 63
            if (newParamCount > int.MaxValue)
                return false;

64
            _parameterCount = (int)newParamCount;
S
Shay Rojansky 已提交
65 66
            return true;
        }
67 68 69 70

        protected override bool IsCommandTextValid()
            => true;

71
        protected override void Consume(RelationalDataReader reader)
72
        {
73
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
                        nextPropagating++) ;

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
103
                        Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
                        break;
                    }

                    // Propagate to results from the reader to the ModificationCommand

                    var modificationCommand = ModificationCommands[commandIndex++];

                    if (!reader.Read())
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries);
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
119
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
120

121
                    npgsqlReader.NextResult();
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }

        protected override async Task ConsumeAsync(
138
            RelationalDataReader reader,
R
Rafael Almeida 已提交
139
            CancellationToken cancellationToken = default)
140
        {
141
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
                        nextPropagating++)
                        ;

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
172
                        Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false)), "Expected less resultsets");
173 174 175 176 177 178 179
                        break;
                    }

                    // Extract result from the command and propagate it

                    var modificationCommand = ModificationCommands[commandIndex++];

180
                    if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
181 182 183 184 185 186 187 188
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries
                        );
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
189
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
190

191
                    await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }
    }
}