schema_statements.rb 28.6 KB
Newer Older
1
require "active_support/core_ext/string/strip"
2

3 4
module ActiveRecord
  module ConnectionAdapters
5
    module PostgreSQL
6 7 8
      class SchemaCreation < AbstractAdapter::SchemaCreation
        private

9 10 11 12
          def visit_ColumnDefinition(o)
            o.sql_type = type_to_sql(o.type, o.limit, o.precision, o.scale, o.array)
            super
          end
13

14 15 16 17 18
          def add_column_options!(sql, options)
            if options[:collation]
              sql << " COLLATE \"#{options[:collation]}\""
            end
            super
19
          end
20 21
      end

22 23 24 25 26 27 28 29 30
      module SchemaStatements
        # Drops the database specified on the +name+ attribute
        # and creates it again using the provided +options+.
        def recreate_database(name, options = {}) #:nodoc:
          drop_database(name)
          create_database(name, options)
        end

        # Create a new PostgreSQL database. Options include <tt>:owner</tt>, <tt>:template</tt>,
31
        # <tt>:encoding</tt> (defaults to utf8), <tt>:collation</tt>, <tt>:ctype</tt>,
32 33 34 35 36
        # <tt>:tablespace</tt>, and <tt>:connection_limit</tt> (note that MySQL uses
        # <tt>:charset</tt> while PostgreSQL uses <tt>:encoding</tt>).
        #
        # Example:
        #   create_database config[:database], config
A
AvnerCohen 已提交
37
        #   create_database 'foo_development', encoding: 'unicode'
38
        def create_database(name, options = {})
39
          options = { encoding: "utf8" }.merge!(options.symbolize_keys)
40

41 42
          option_string = options.inject("") do |memo, (key, value)|
            memo += case key
43 44 45 46 47 48 49 50 51 52 53 54 55 56
                    when :owner
                      " OWNER = \"#{value}\""
                    when :template
                      " TEMPLATE = \"#{value}\""
                    when :encoding
                      " ENCODING = '#{value}'"
                    when :collation
                      " LC_COLLATE = '#{value}'"
                    when :ctype
                      " LC_CTYPE = '#{value}'"
                    when :tablespace
                      " TABLESPACE = \"#{value}\""
                    when :connection_limit
                      " CONNECTION LIMIT = #{value}"
57
            else
58
                      ""
59 60 61 62 63 64 65 66 67 68 69 70 71 72
            end
          end

          execute "CREATE DATABASE #{quote_table_name(name)}#{option_string}"
        end

        # Drops a PostgreSQL database.
        #
        # Example:
        #   drop_database 'matt_development'
        def drop_database(name) #:nodoc:
          execute "DROP DATABASE IF EXISTS #{quote_table_name(name)}"
        end

73
        # Returns the list of all tables in the schema search path.
74
        def tables(name = nil)
75 76 77 78 79 80
          if name
            ActiveSupport::Deprecation.warn(<<-MSG.squish)
              Passing arguments to #tables is deprecated without replacement.
            MSG
          end

81
          select_values("SELECT tablename FROM pg_tables WHERE schemaname = ANY(current_schemas(false))", "SCHEMA")
82 83
        end

84
        def data_sources # :nodoc
85
          select_values(<<-SQL, "SCHEMA")
86 87 88
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
R
Ryuta Kamizono 已提交
89
            WHERE c.relkind IN ('r','v','m') -- (r)elation/table, (v)iew, (m)aterialized view
90 91 92 93
            AND n.nspname = ANY (current_schemas(false))
          SQL
        end

94 95 96 97
        # Returns true if table exists.
        # If the schema is not specified as part of +name+ then it will only find tables within
        # the current schema search path (regardless of permissions to access tables in other schemas)
        def table_exists?(name)
98 99 100 101 102 103 104 105 106 107
          ActiveSupport::Deprecation.warn(<<-MSG.squish)
            #table_exists? currently checks both tables and views.
            This behavior is deprecated and will be changed with Rails 5.1 to only check tables.
            Use #data_source_exists? instead.
          MSG

          data_source_exists?(name)
        end

        def data_source_exists?(name)
108 109
          name = Utils.extract_schema_qualified_name(name.to_s)
          return false unless name.identifier
110

R
Ryuta Kamizono 已提交
111 112
          select_values(<<-SQL, "SCHEMA").any?
              SELECT c.relname
113 114
              FROM pg_class c
              LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
115
              WHERE c.relkind IN ('r','v','m') -- (r)elation/table, (v)iew, (m)aterialized view
R
Ryuta Kamizono 已提交
116 117
              AND c.relname = #{quote(name.identifier)}
              AND n.nspname = #{name.schema ? quote(name.schema) : "ANY (current_schemas(false))"}
118 119 120
          SQL
        end

121
        def views # :nodoc:
122
          select_values(<<-SQL, "SCHEMA")
123 124 125 126 127 128 129 130 131 132 133 134
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relkind IN ('v','m') -- (v)iew, (m)aterialized view
            AND n.nspname = ANY (current_schemas(false))
          SQL
        end

        def view_exists?(view_name) # :nodoc:
          name = Utils.extract_schema_qualified_name(view_name.to_s)
          return false unless name.identifier

135
          select_values(<<-SQL, "SCHEMA").any?
136 137 138 139
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relkind IN ('v','m') -- (v)iew, (m)aterialized view
R
Ryuta Kamizono 已提交
140 141
            AND c.relname = #{quote(name.identifier)}
            AND n.nspname = #{name.schema ? quote(name.schema) : "ANY (current_schemas(false))"}
142 143 144
          SQL
        end

145
        def drop_table(table_name, options = {}) # :nodoc:
146
          execute "DROP TABLE#{' IF EXISTS' if options[:if_exists]} #{quote_table_name(table_name)}#{' CASCADE' if options[:force] == :cascade}"
147 148
        end

149 150
        # Returns true if schema exists.
        def schema_exists?(name)
151
          select_value("SELECT COUNT(*) FROM pg_namespace WHERE nspname = '#{name}'", "SCHEMA").to_i > 0
152 153
        end

154
        # Verifies existence of an index with a given name.
155
        def index_name_exists?(table_name, index_name, default)
156 157 158
          table = Utils.extract_schema_qualified_name(table_name.to_s)
          index = Utils.extract_schema_qualified_name(index_name.to_s)

159
          select_value(<<-SQL, "SCHEMA").to_i > 0
160 161 162 163
            SELECT COUNT(*)
            FROM pg_class t
            INNER JOIN pg_index d ON t.oid = d.indrelid
            INNER JOIN pg_class i ON d.indexrelid = i.oid
164
            LEFT JOIN pg_namespace n ON n.oid = i.relnamespace
165
            WHERE i.relkind = 'i'
166 167 168
              AND i.relname = '#{index.identifier}'
              AND t.relname = '#{table.identifier}'
              AND n.nspname = #{index.schema ? "'#{index.schema}'" : 'ANY (current_schemas(false))'}
169 170 171
          SQL
        end

172 173
        # Returns an array of indexes for the given table.
        def indexes(table_name, name = nil)
174 175
          table = Utils.extract_schema_qualified_name(table_name.to_s)

176
          result = query(<<-SQL, "SCHEMA")
177
            SELECT distinct i.relname, d.indisunique, d.indkey, pg_get_indexdef(d.indexrelid), t.oid,
178 179 180 181
                            pg_catalog.obj_description(i.oid, 'pg_class') AS comment,
            (SELECT COUNT(*) FROM pg_opclass o
               JOIN (SELECT unnest(string_to_array(d.indclass::text, ' '))::int oid) c
                 ON o.oid = c.oid WHERE o.opcdefault = 'f')
182 183 184 185 186 187 188 189
            FROM pg_class t
            INNER JOIN pg_index d ON t.oid = d.indrelid
            INNER JOIN pg_class i ON d.indexrelid = i.oid
            LEFT JOIN pg_namespace n ON n.oid = i.relnamespace
            WHERE i.relkind = 'i'
              AND d.indisprimary = 'f'
              AND t.relname = '#{table.identifier}'
              AND n.nspname = #{table.schema ? "'#{table.schema}'" : 'ANY (current_schemas(false))'}
190 191 192 193 194
            ORDER BY i.relname
          SQL

          result.map do |row|
            index_name = row[0]
195 196
            unique = row[1]
            indkey = row[2].split(" ").map(&:to_i)
197 198
            inddef = row[3]
            oid = row[4]
199
            comment = row[5]
200
            opclass = row[6]
201

202
            using, expressions, where = inddef.scan(/ USING (\w+?) \((.+?)\)(?: WHERE (.+))?\z/).flatten
203

204 205 206 207 208 209 210 211 212
            if indkey.include?(0) || opclass > 0
              columns = expressions
            else
              columns = Hash[query(<<-SQL.strip_heredoc, "SCHEMA")].values_at(*indkey).compact
                SELECT a.attnum, a.attname
                FROM pg_attribute a
                WHERE a.attrelid = #{oid}
                AND a.attnum IN (#{indkey.join(",")})
              SQL
213

214
              # add info on sort order for columns (only desc order is explicitly specified, asc is the default)
215 216 217
              orders = Hash[
                expressions.scan(/(\w+) DESC/).flatten.map { |order_column| [order_column, :desc] }
              ]
218
            end
219

220
            IndexDefinition.new(table_name, index_name, unique, columns, [], orders, where, nil, using.to_sym, comment.presence)
221 222 223
          end.compact
        end

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
        def new_column_from_field(table_name, field) # :nondoc:
          column_name, type, default, notnull, oid, fmod, collation, comment = field
          oid = oid.to_i
          fmod = fmod.to_i
          type_metadata = fetch_type_metadata(column_name, type, oid, fmod)
          default_value = extract_value_from_default(default)
          default_function = extract_default_function(default_value, default)
          PostgreSQLColumn.new(
            column_name,
            default_value,
            type_metadata,
            !notnull,
            table_name,
            default_function,
            collation,
            comment: comment.presence
          )
241 242
        end

243
        def table_options(table_name) # :nodoc:
R
Ryuta Kamizono 已提交
244 245 246
          if comment = table_comment(table_name)
            { comment: comment }
          end
247 248
        end

249 250 251
        # Returns a comment stored in database for given table
        def table_comment(table_name) # :nodoc:
          name = Utils.extract_schema_qualified_name(table_name.to_s)
252
          if name.identifier
253
            select_value(<<-SQL.strip_heredoc, "SCHEMA")
254 255 256 257 258 259 260 261
              SELECT pg_catalog.obj_description(c.oid, 'pg_class')
              FROM pg_catalog.pg_class c
                LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
              WHERE c.relname = #{quote(name.identifier)}
                AND c.relkind IN ('r') -- (r)elation/table
                AND n.nspname = #{name.schema ? quote(name.schema) : 'ANY (current_schemas(false))'}
            SQL
          end
262 263
        end

264 265
        # Returns the current database name.
        def current_database
266
          select_value("select current_database()", "SCHEMA")
267 268 269 270
        end

        # Returns the current schema name.
        def current_schema
271
          select_value("SELECT current_schema", "SCHEMA")
272 273 274 275
        end

        # Returns the current database encoding format.
        def encoding
276
          select_value("SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
277 278 279 280
        end

        # Returns the current database collation.
        def collation
281
          select_value("SELECT datcollate FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
282 283 284 285
        end

        # Returns the current database ctype.
        def ctype
286
          select_value("SELECT datctype FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
287 288 289 290
        end

        # Returns an array of schema names.
        def schema_names
291
          select_values(<<-SQL, "SCHEMA")
292 293 294 295 296 297 298 299 300
            SELECT nspname
              FROM pg_namespace
             WHERE nspname !~ '^pg_.*'
               AND nspname NOT IN ('information_schema')
             ORDER by nspname;
          SQL
        end

        # Creates a schema for the given schema name.
301
        def create_schema(schema_name)
302
          execute "CREATE SCHEMA #{quote_schema_name(schema_name)}"
303 304 305
        end

        # Drops the schema for the given schema name.
306
        def drop_schema(schema_name, options = {})
307
          execute "DROP SCHEMA#{' IF EXISTS' if options[:if_exists]} #{quote_schema_name(schema_name)} CASCADE"
308 309 310 311 312 313 314 315 316
        end

        # Sets the schema search path to a string of comma-separated schema names.
        # Names beginning with $ have to be quoted (e.g. $user => '$user').
        # See: http://www.postgresql.org/docs/current/static/ddl-schemas.html
        #
        # This should be not be called manually but set in database.yml.
        def schema_search_path=(schema_csv)
          if schema_csv
317
            execute("SET search_path TO #{schema_csv}", "SCHEMA")
318 319 320 321 322 323
            @schema_search_path = schema_csv
          end
        end

        # Returns the active schema search path.
        def schema_search_path
324
          @schema_search_path ||= select_value("SHOW search_path", "SCHEMA")
325 326 327 328
        end

        # Returns the current client message level.
        def client_min_messages
329
          select_value("SHOW client_min_messages", "SCHEMA")
330 331 332 333
        end

        # Set the client message level.
        def client_min_messages=(level)
334
          execute("SET client_min_messages TO '#{level}'", "SCHEMA")
335 336 337
        end

        # Returns the sequence name for a table's primary key or some other specified key.
338
        def default_sequence_name(table_name, pk = "id") #:nodoc:
B
bogdanvlviv 已提交
339
          result = serial_sequence(table_name, pk)
340
          return nil unless result
341
          Utils.extract_schema_qualified_name(result).to_s
342
        rescue ActiveRecord::StatementInvalid
B
bogdanvlviv 已提交
343
          PostgreSQL::Name.new(nil, "#{table_name}_#{pk}_seq").to_s
344 345 346
        end

        def serial_sequence(table, column)
347
          select_value("SELECT pg_get_serial_sequence('#{table}', '#{column}')", "SCHEMA")
348 349
        end

350
        # Sets the sequence of a table's primary key to the specified value.
A
Aaron Patterson 已提交
351 352
        def set_pk_sequence!(table, value) #:nodoc:
          pk, sequence = pk_and_sequence_for(table)
353 354 355

          if pk
            if sequence
356
              quoted_sequence = quote_table_name(sequence)
357

358
              select_value("SELECT setval('#{quoted_sequence}', #{value})", "SCHEMA")
359
            else
360
              @logger.warn "#{table} has primary key #{pk} with no default sequence." if @logger
361 362 363 364
            end
          end
        end

365 366
        # Resets the sequence of a table's primary key to the maximum value.
        def reset_pk_sequence!(table, pk = nil, sequence = nil) #:nodoc:
367
          unless pk && sequence
368 369 370 371 372 373 374
            default_pk, default_sequence = pk_and_sequence_for(table)

            pk ||= default_pk
            sequence ||= default_sequence
          end

          if @logger && pk && !sequence
375
            @logger.warn "#{table} has primary key #{pk} with no default sequence."
376 377 378 379 380
          end

          if pk && sequence
            quoted_sequence = quote_table_name(sequence)

381
            select_value(<<-end_sql, "SCHEMA")
382 383 384 385 386 387 388 389 390
              SELECT setval('#{quoted_sequence}', (SELECT COALESCE(MAX(#{quote_column_name pk})+(SELECT increment_by FROM #{quoted_sequence}), (SELECT min_value FROM #{quoted_sequence})) FROM #{quote_table_name(table)}), false)
            end_sql
          end
        end

        # Returns a table's primary key and belonging sequence.
        def pk_and_sequence_for(table) #:nodoc:
          # First try looking for a sequence with a dependency on the
          # given table's primary key.
391
          result = query(<<-end_sql, "SCHEMA")[0]
392
            SELECT attr.attname, nsp.nspname, seq.relname
393 394 395
            FROM pg_class      seq,
                 pg_attribute  attr,
                 pg_depend     dep,
396 397
                 pg_constraint cons,
                 pg_namespace  nsp
398 399 400 401 402 403
            WHERE seq.oid           = dep.objid
              AND seq.relkind       = 'S'
              AND attr.attrelid     = dep.refobjid
              AND attr.attnum       = dep.refobjsubid
              AND attr.attrelid     = cons.conrelid
              AND attr.attnum       = cons.conkey[1]
404
              AND seq.relnamespace  = nsp.oid
405
              AND cons.contype      = 'p'
406
              AND dep.classid       = 'pg_class'::regclass
407 408 409
              AND dep.refobjid      = '#{quote_table_name(table)}'::regclass
          end_sql

410
          if result.nil? || result.empty?
411
            result = query(<<-end_sql, "SCHEMA")[0]
412
              SELECT attr.attname, nsp.nspname,
413
                CASE
414
                  WHEN pg_get_expr(def.adbin, def.adrelid) !~* 'nextval' THEN NULL
415 416 417 418
                  WHEN split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2) ~ '.' THEN
                    substr(split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2),
                           strpos(split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2), '.')+1)
                  ELSE split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2)
419 420 421 422 423
                END
              FROM pg_class       t
              JOIN pg_attribute   attr ON (t.oid = attrelid)
              JOIN pg_attrdef     def  ON (adrelid = attrelid AND adnum = attnum)
              JOIN pg_constraint  cons ON (conrelid = adrelid AND adnum = conkey[1])
424
              JOIN pg_namespace   nsp  ON (t.relnamespace = nsp.oid)
425 426
              WHERE t.oid = '#{quote_table_name(table)}'::regclass
                AND cons.contype = 'p'
427
                AND pg_get_expr(def.adbin, def.adrelid) ~* 'nextval|uuid_generate'
428 429 430
            end_sql
          end

431 432 433 434 435 436
          pk = result.shift
          if result.last
            [pk, PostgreSQL::Name.new(*result)]
          else
            [pk, nil]
          end
437 438 439 440
        rescue
          nil
        end

441
        def primary_keys(table_name) # :nodoc:
442
          select_values(<<-SQL.strip_heredoc, "SCHEMA")
443 444 445
            WITH pk_constraint AS (
              SELECT conrelid, unnest(conkey) AS connum FROM pg_constraint
              WHERE contype = 'p'
R
Ryuta Kamizono 已提交
446
                AND conrelid = #{quote(quote_table_name(table_name))}::regclass
447 448 449 450 451 452 453
            ), cons AS (
              SELECT conrelid, connum, row_number() OVER() AS rownum FROM pk_constraint
            )
            SELECT attr.attname FROM pg_attribute attr
            INNER JOIN cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.connum
            ORDER BY cons.rownum
          SQL
454 455 456
        end

        # Renames a table.
457 458
        # Also renames a table's primary key sequence if the sequence name exists and
        # matches the Active Record default.
459 460 461
        #
        # Example:
        #   rename_table('octopuses', 'octopi')
462
        def rename_table(table_name, new_name)
463
          clear_cache!
464
          execute "ALTER TABLE #{quote_table_name(table_name)} RENAME TO #{quote_table_name(new_name)}"
465
          pk, seq = pk_and_sequence_for(new_name)
466
          if seq && seq.identifier == "#{table_name}_#{pk}_seq"
467
            new_seq = "#{new_name}_#{pk}_seq"
468 469
            idx = "#{table_name}_pkey"
            new_idx = "#{new_name}_pkey"
470
            execute "ALTER TABLE #{seq.quoted} RENAME TO #{quote_table_name(new_seq)}"
471
            execute "ALTER INDEX #{quote_table_name(idx)} RENAME TO #{quote_table_name(new_idx)}"
472
          end
473 474

          rename_table_indexes(table_name, new_name)
475 476
        end

T
Tony Miller 已提交
477
        def add_column(table_name, column_name, type, options = {}) #:nodoc:
478
          clear_cache!
479
          super
480
          change_column_comment(table_name, column_name, options[:comment]) if options.key?(:comment)
481 482
        end

483
        def change_column(table_name, column_name, type, options = {}) #:nodoc:
484 485
          clear_cache!
          quoted_table_name = quote_table_name(table_name)
486
          quoted_column_name = quote_column_name(column_name)
487
          sql_type = type_to_sql(type, options[:limit], options[:precision], options[:scale], options[:array])
488
          sql = "ALTER TABLE #{quoted_table_name} ALTER COLUMN #{quoted_column_name} TYPE #{sql_type}"
489 490 491
          if options[:collation]
            sql << " COLLATE \"#{options[:collation]}\""
          end
492 493 494
          if options[:using]
            sql << " USING #{options[:using]}"
          elsif options[:cast_as]
495
            cast_as_type = type_to_sql(options[:cast_as], options[:limit], options[:precision], options[:scale], options[:array])
496
            sql << " USING CAST(#{quoted_column_name} AS #{cast_as_type})"
497
          end
498
          execute sql
499 500 501

          change_column_default(table_name, column_name, options[:default]) if options_include_default?(options)
          change_column_null(table_name, column_name, options[:null], options[:default]) if options.key?(:null)
502
          change_column_comment(table_name, column_name, options[:comment]) if options.key?(:comment)
503 504 505
        end

        # Changes the default value of a table column.
506
        def change_column_default(table_name, column_name, default_or_changes) # :nodoc:
507
          clear_cache!
508
          column = column_for(table_name, column_name)
509
          return unless column
510

511
          default = extract_new_default_value(default_or_changes)
512 513 514 515 516 517
          alter_column_query = "ALTER TABLE #{quote_table_name(table_name)} ALTER COLUMN #{quote_column_name(column_name)} %s"
          if default.nil?
            # <tt>DEFAULT NULL</tt> results in the same behavior as <tt>DROP DEFAULT</tt>. However, PostgreSQL will
            # cast the default to the columns type, which leaves us with a default like "default NULL::character varying".
            execute alter_column_query % "DROP DEFAULT"
          else
518
            execute alter_column_query % "SET DEFAULT #{quote_default_expression(default, column)}"
519
          end
520 521
        end

522
        def change_column_null(table_name, column_name, null, default = nil) #:nodoc:
523 524
          clear_cache!
          unless null || default.nil?
525
            column = column_for(table_name, column_name)
526
            execute("UPDATE #{quote_table_name(table_name)} SET #{quote_column_name(column_name)}=#{quote_default_expression(default, column)} WHERE #{quote_column_name(column_name)} IS NULL") if column
527 528 529 530
          end
          execute("ALTER TABLE #{quote_table_name(table_name)} ALTER #{quote_column_name(column_name)} #{null ? 'DROP' : 'SET'} NOT NULL")
        end

531 532 533 534 535 536 537 538 539 540 541 542
        # Adds comment for given table column or drops it if +comment+ is a +nil+
        def change_column_comment(table_name, column_name, comment) # :nodoc:
          clear_cache!
          execute "COMMENT ON COLUMN #{quote_table_name(table_name)}.#{quote_column_name(column_name)} IS #{quote(comment)}"
        end

        # Adds comment for given table or drops it if +comment+ is a +nil+
        def change_table_comment(table_name, comment) # :nodoc:
          clear_cache!
          execute "COMMENT ON TABLE #{quote_table_name(table_name)} IS #{quote(comment)}"
        end

543
        # Renames a column in a table.
544
        def rename_column(table_name, column_name, new_column_name) #:nodoc:
545 546
          clear_cache!
          execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}"
547
          rename_column_indexes(table_name, column_name, new_column_name)
548 549
        end

D
doabit 已提交
550
        def add_index(table_name, column_name, options = {}) #:nodoc:
551
          index_name, index_type, index_columns, index_options, index_algorithm, index_using, comment = add_index_options(table_name, column_name, options)
552 553 554
          execute("CREATE #{index_type} INDEX #{index_algorithm} #{quote_column_name(index_name)} ON #{quote_table_name(table_name)} #{index_using} (#{index_columns})#{index_options}").tap do
            execute "COMMENT ON INDEX #{quote_column_name(index_name)} IS #{quote(comment)}" if comment
          end
D
doabit 已提交
555 556
        end

557
        def remove_index(table_name, options = {}) #:nodoc:
558 559 560 561 562 563 564 565 566 567 568 569 570 571
          table = Utils.extract_schema_qualified_name(table_name.to_s)

          if options.is_a?(Hash) && options.key?(:name)
            provided_index = Utils.extract_schema_qualified_name(options[:name].to_s)

            options[:name] = provided_index.identifier
            table = PostgreSQL::Name.new(provided_index.schema, table.identifier) unless table.schema.present?

            if provided_index.schema.present? && table.schema != provided_index.schema
              raise ArgumentError.new("Index schema '#{provided_index.schema}' does not match table schema '#{table.schema}'")
            end
          end

          index_to_remove = PostgreSQL::Name.new(table.schema, index_name_for_remove(table.to_s, options))
572
          algorithm =
573
            if options.is_a?(Hash) && options.key?(:algorithm)
574 575 576 577
              index_algorithms.fetch(options[:algorithm]) do
                raise ArgumentError.new("Algorithm must be one of the following: #{index_algorithms.keys.map(&:inspect).join(', ')}")
              end
            end
578
          execute "DROP INDEX #{algorithm} #{quote_table_name(index_to_remove)}"
579 580
        end

581 582
        # Renames an index of a table. Raises error if length of new
        # index name is greater than allowed limit.
583
        def rename_index(table_name, old_name, new_name)
584 585
          validate_index_length!(table_name, new_name)

586 587 588
          execute "ALTER INDEX #{quote_column_name(old_name)} RENAME TO #{quote_table_name(new_name)}"
        end

589
        def foreign_keys(table_name)
590
          fk_info = select_all(<<-SQL.strip_heredoc, "SCHEMA")
591
            SELECT t2.oid::regclass::text AS to_table, a1.attname AS column, a2.attname AS primary_key, c.conname AS name, c.confupdtype AS on_update, c.confdeltype AS on_delete
592 593 594 595 596 597 598 599 600 601
            FROM pg_constraint c
            JOIN pg_class t1 ON c.conrelid = t1.oid
            JOIN pg_class t2 ON c.confrelid = t2.oid
            JOIN pg_attribute a1 ON a1.attnum = c.conkey[1] AND a1.attrelid = t1.oid
            JOIN pg_attribute a2 ON a2.attnum = c.confkey[1] AND a2.attrelid = t2.oid
            JOIN pg_namespace t3 ON c.connamespace = t3.oid
            WHERE c.contype = 'f'
              AND t1.relname = #{quote(table_name)}
              AND t3.nspname = ANY (current_schemas(false))
            ORDER BY c.conname
602 603 604 605
          SQL

          fk_info.map do |row|
            options = {
606 607 608
              column: row["column"],
              name: row["name"],
              primary_key: row["primary_key"]
609 610
            }

611 612
            options[:on_delete] = extract_foreign_key_action(row["on_delete"])
            options[:on_update] = extract_foreign_key_action(row["on_update"])
613

614
            ForeignKeyDefinition.new(table_name, row["to_table"], options)
Y
Yves Senn 已提交
615 616 617 618 619
          end
        end

        def extract_foreign_key_action(specifier) # :nodoc:
          case specifier
620 621 622
          when "c"; :cascade
          when "n"; :nullify
          when "r"; :restrict
623 624 625
          end
        end

626 627 628 629 630
        def index_name_length
          63
        end

        # Maps logical Rails types to PostgreSQL-specific data types.
631
        def type_to_sql(type, limit = nil, precision = nil, scale = nil, array = nil)
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
          sql = \
            case type.to_s
            when "binary"
              # PostgreSQL doesn't support limits on binary (bytea) columns.
              # The hard limit is 1GB, because of a 32-bit size field, and TOAST.
              case limit
              when nil, 0..0x3fffffff; super(type)
              else raise(ActiveRecordError, "No binary type has byte size #{limit}.")
              end
            when "text"
              # PostgreSQL doesn't support limits on text columns.
              # The hard limit is 1GB, according to section 8.3 in the manual.
              case limit
              when nil, 0..0x3fffffff; super(type)
              else raise(ActiveRecordError, "The limit on text can be at most 1GB - 1byte.")
              end
            when "integer"
              case limit
              when 1, 2; "smallint"
              when nil, 3, 4; "integer"
              when 5..8; "bigint"
              else raise(ActiveRecordError, "No integer type has byte size #{limit}. Use a numeric with scale 0 instead.")
              end
            else
              super(type, limit, precision, scale)
            end
658

659
          sql << "[]" if array && type != :primary_key
660
          sql
661 662 663 664
        end

        # PostgreSQL requires the ORDER BY columns in the select list for distinct queries, and
        # requires that the ORDER BY include the distinct column.
665
        def columns_for_distinct(columns, orders) #:nodoc:
666
          order_columns = orders.reject(&:blank?).map { |s|
667 668 669
              # Convert Arel node to string
              s = s.to_sql unless s.is_a?(String)
              # Remove any ASC/DESC modifiers
670 671
              s.gsub(/\s+(?:ASC|DESC)\b/i, "")
               .gsub(/\s+NULLS\s+(?:FIRST|LAST)\b/i, "")
672 673
            }.reject(&:blank?).map.with_index { |column, i| "#{column} AS alias_#{i}" }

674
          [super, *order_columns].join(", ")
675
        end
S
Sean Griffin 已提交
676 677 678 679 680 681 682 683 684 685 686 687

        def fetch_type_metadata(column_name, sql_type, oid, fmod)
          cast_type = get_oid_type(oid, fmod, column_name, sql_type)
          simple_type = SqlTypeMetadata.new(
            sql_type: sql_type,
            type: cast_type.type,
            limit: cast_type.limit,
            precision: cast_type.precision,
            scale: cast_type.scale,
          )
          PostgreSQLTypeMetadata.new(simple_type, oid: oid, fmod: fmod)
        end
688 689 690 691
      end
    end
  end
end