NpgsqlModificationCommandBatch.cs 9.2 KB
Newer Older
1
using System;
2 3 4
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
5
using Microsoft.EntityFrameworkCore;
6
using Microsoft.EntityFrameworkCore.Diagnostics;
7 8
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Storage;
9
using Microsoft.EntityFrameworkCore.Update;
10

11
namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal
12
{
13 14 15
    /// <summary>
    /// The Npgsql-specific implementation for <see cref="ModificationCommandBatch" />.
    /// </summary>
16 17
    /// <remarks>
    /// The usual ModificationCommandBatch implementation is <see cref="AffectedCountModificationCommandBatch"/>,
S
Shay Rojansky 已提交
18
    /// which selects the number of rows modified via a SQL query.
19 20 21 22 23 24 25 26 27 28 29
    ///
    /// PostgreSQL actually has no way of selecting the modified row count.
    /// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
    /// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
    /// identifier F121-01.
    ///
    /// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
    /// cleaner and more efficient anyway (no additional query).
    /// </remarks>
    public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
    {
30 31 32
        private const int DefaultBatchSize = 1000;
        private readonly int _maxBatchSize;
        private int _parameterCount;
S
Shay Rojansky 已提交
33

34 35 36 37 38 39 40 41
        /// <summary>
        /// Constructs an instance of the <see cref="NpgsqlModificationCommandBatch"/> class.
        /// </summary>
        /// <param name="commandBuilderFactory">The builder to build commands.</param>
        /// <param name="sqlGenerationHelper">A helper for SQL generation.</param>
        /// <param name="updateSqlGenerator">A SQL generator for insert, update, and delete commands.</param>
        /// <param name="valueBufferFactoryFactory">A factory for creating <see cref="ValueBuffer" /> factories.</param>
        /// <param name="maxBatchSize">The maximum count of commands to batch.</param>
42
        public NpgsqlModificationCommandBatch(
43
            ModificationCommandBatchFactoryDependencies dependencies,
S
Shay Rojansky 已提交
44
            int? maxBatchSize)
45
            : base(dependencies)
46
        {
S
Shay Rojansky 已提交
47
            if (maxBatchSize.HasValue && maxBatchSize.Value <= 0)
48
            {
49
                throw new ArgumentOutOfRangeException(nameof(maxBatchSize), RelationalStrings.InvalidMaxBatchSize(maxBatchSize));
50
            }
S
Shay Rojansky 已提交
51 52

            _maxBatchSize = maxBatchSize ?? DefaultBatchSize;
53 54
        }

55
        protected override int GetParameterCount() => _parameterCount;
S
Shay Rojansky 已提交
56

S
Shay Rojansky 已提交
57
        protected override bool CanAddCommand(IReadOnlyModificationCommand modificationCommand)
S
Shay Rojansky 已提交
58 59
        {
            if (ModificationCommands.Count >= _maxBatchSize)
60
            {
S
Shay Rojansky 已提交
61
                return false;
62
            }
S
Shay Rojansky 已提交
63

S
Shay Rojansky 已提交
64
            var newParamCount = (long)_parameterCount + modificationCommand.ColumnModifications.Count;
S
Shay Rojansky 已提交
65
            if (newParamCount > int.MaxValue)
66
            {
S
Shay Rojansky 已提交
67
                return false;
68
            }
S
Shay Rojansky 已提交
69

70
            _parameterCount = (int)newParamCount;
S
Shay Rojansky 已提交
71 72
            return true;
        }
73 74 75 76

        protected override bool IsCommandTextValid()
            => true;

77
        protected override void Consume(RelationalDataReader reader)
78
        {
79
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
S
Shay Rojansky 已提交
80 81

#pragma warning disable 618
82
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
S
Shay Rojansky 已提交
83 84
#pragma warning restore 618

85 86 87 88 89 90 91 92 93 94 95
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
96 97 98
                        nextPropagating++)
                    {
                    }
99 100 101 102 103

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
S
Shay Rojansky 已提交
104
#pragma warning disable 618
105 106 107 108 109 110 111
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
S
Shay Rojansky 已提交
112
#pragma warning restore 618
113 114 115 116
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
117
                        Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
                        break;
                    }

                    // Propagate to results from the reader to the ModificationCommand

                    var modificationCommand = ModificationCommands[commandIndex++];

                    if (!reader.Read())
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries);
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
133
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
134

135
                    npgsqlReader.NextResult();
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }

        protected override async Task ConsumeAsync(
152
            RelationalDataReader reader,
R
Rafael Almeida 已提交
153
            CancellationToken cancellationToken = default)
154
        {
155
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
S
Shay Rojansky 已提交
156 157

#pragma warning disable 618
158
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
S
Shay Rojansky 已提交
159 160
#pragma warning restore 618

161 162 163 164 165 166 167 168 169 170 171 172
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
                        nextPropagating++)
173
                    {
174
                        ;
175
                    }
176 177 178 179 180

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
S
Shay Rojansky 已提交
181
#pragma warning disable 618
182 183 184 185 186 187 188
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
S
Shay Rojansky 已提交
189
#pragma warning restore 618
190 191 192 193
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
194
                        Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false)), "Expected less resultsets");
195 196 197 198 199 200 201
                        break;
                    }

                    // Extract result from the command and propagate it

                    var modificationCommand = ModificationCommands[commandIndex++];

202
                    if (!(await reader.ReadAsync(cancellationToken).ConfigureAwait(false)))
203 204 205 206 207 208 209 210
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries
                        );
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
211
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
212

213
                    await npgsqlReader.NextResultAsync(cancellationToken).ConfigureAwait(false);
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }
    }
}