NpgsqlModificationCommandBatch.cs 8.3 KB
Newer Older
1
using System;
2 3 4 5
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
6
using Microsoft.EntityFrameworkCore;
7 8
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.EntityFrameworkCore.Storage;
9
using Microsoft.EntityFrameworkCore.Update;
10

11
namespace Npgsql.EntityFrameworkCore.PostgreSQL.Update.Internal
12 13 14
{
    /// <remarks>
    /// The usual ModificationCommandBatch implementation is <see cref="AffectedCountModificationCommandBatch"/>,
S
Shay Rojansky 已提交
15
    /// which selects the number of rows modified via a SQL query.
16 17 18 19 20 21 22 23 24 25 26
    ///
    /// PostgreSQL actually has no way of selecting the modified row count.
    /// SQL defines GET DIAGNOSTICS which should provide this, but in PostgreSQL it's only available
    /// in PL/pgSQL. See http://www.postgresql.org/docs/9.4/static/unsupported-features-sql-standard.html,
    /// identifier F121-01.
    ///
    /// Instead, the affected row count can be accessed in the PostgreSQL protocol itself, which seems
    /// cleaner and more efficient anyway (no additional query).
    /// </remarks>
    public class NpgsqlModificationCommandBatch : ReaderModificationCommandBatch
    {
S
Shay Rojansky 已提交
27 28
        const int DefaultBatchSize = 1000;
        readonly int _maxBatchSize;
S
Shay Rojansky 已提交
29
        long _parameterCount;
S
Shay Rojansky 已提交
30

31 32 33 34
        public NpgsqlModificationCommandBatch(
            [NotNull] IRelationalCommandBuilderFactory commandBuilderFactory,
            [NotNull] ISqlGenerationHelper sqlGenerationHelper,
            [NotNull] IUpdateSqlGenerator updateSqlGenerator,
S
Shay Rojansky 已提交
35 36
            [NotNull] IRelationalValueBufferFactoryFactory valueBufferFactoryFactory,
            [CanBeNull] int? maxBatchSize)
37 38
            : base(commandBuilderFactory, sqlGenerationHelper, updateSqlGenerator, valueBufferFactoryFactory)
        {
S
Shay Rojansky 已提交
39 40 41 42
            if (maxBatchSize.HasValue && maxBatchSize.Value <= 0)
                throw new ArgumentOutOfRangeException(nameof(maxBatchSize), RelationalStrings.InvalidMaxBatchSize);

            _maxBatchSize = maxBatchSize ?? DefaultBatchSize;
43 44
        }

S
Shay Rojansky 已提交
45 46
        protected override int GetParameterCount() => (int)_parameterCount;

47
        protected override bool CanAddCommand(ModificationCommand modificationCommand)
S
Shay Rojansky 已提交
48 49 50 51 52 53 54 55 56 57 58 59
        {
            if (ModificationCommands.Count >= _maxBatchSize)
                return false;

            var newParamCount = _parameterCount + modificationCommand.ColumnModifications.Count;

            if (newParamCount > int.MaxValue)
                return false;

            _parameterCount = newParamCount;
            return true;
        }
60 61 62 63

        protected override bool IsCommandTextValid()
            => true;

64
        protected override void Consume(RelationalDataReader reader)
65
        {
66
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
                        nextPropagating++) ;

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
96
                        Debug.Assert(!npgsqlReader.NextResult(), "Expected less resultsets");
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
                        break;
                    }

                    // Propagate to results from the reader to the ModificationCommand

                    var modificationCommand = ModificationCommands[commandIndex++];

                    if (!reader.Read())
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries);
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
112
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
113

114
                    npgsqlReader.NextResult();
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }

        protected override async Task ConsumeAsync(
131
            RelationalDataReader reader,
R
Rafael Almeida 已提交
132
            CancellationToken cancellationToken = default)
133
        {
134
            var npgsqlReader = (NpgsqlDataReader)reader.DbDataReader;
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
            Debug.Assert(npgsqlReader.Statements.Count == ModificationCommands.Count, $"Reader has {npgsqlReader.Statements.Count} statements, expected {ModificationCommands.Count}");
            var commandIndex = 0;

            try
            {
                while (true)
                {
                    // Find the next propagating command, if any
                    int nextPropagating;
                    for (nextPropagating = commandIndex;
                        nextPropagating < ModificationCommands.Count &&
                        !ModificationCommands[nextPropagating].RequiresResultPropagation;
                        nextPropagating++)
                        ;

                    // Go over all non-propagating commands before the next propagating one,
                    // make sure they executed
                    for (; commandIndex < nextPropagating; commandIndex++)
                    {
                        if (npgsqlReader.Statements[commandIndex].Rows == 0)
                        {
                            throw new DbUpdateConcurrencyException(
                                RelationalStrings.UpdateConcurrencyException(1, 0),
                                ModificationCommands[commandIndex].Entries
                            );
                        }
                    }

                    if (nextPropagating == ModificationCommands.Count)
                    {
165
                        Debug.Assert(!(await npgsqlReader.NextResultAsync(cancellationToken)), "Expected less resultsets");
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
                        break;
                    }

                    // Extract result from the command and propagate it

                    var modificationCommand = ModificationCommands[commandIndex++];

                    if (!(await reader.ReadAsync(cancellationToken)))
                    {
                        throw new DbUpdateConcurrencyException(
                            RelationalStrings.UpdateConcurrencyException(1, 0),
                            modificationCommand.Entries
                        );
                    }

                    var valueBufferFactory = CreateValueBufferFactory(modificationCommand.ColumnModifications);
182
                    modificationCommand.PropagateResults(valueBufferFactory.Create(npgsqlReader));
183

184
                    await npgsqlReader.NextResultAsync(cancellationToken);
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
                }
            }
            catch (DbUpdateException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new DbUpdateException(
                    RelationalStrings.UpdateStoreException,
                    ex,
                    ModificationCommands[commandIndex].Entries);
            }
        }
    }
}