schema_statements.rb 28.4 KB
Newer Older
1
require "active_support/core_ext/string/strip"
2

3 4
module ActiveRecord
  module ConnectionAdapters
5
    module PostgreSQL
6 7 8
      class SchemaCreation < AbstractAdapter::SchemaCreation
        private

9
        def visit_ColumnDefinition(o)
10
          o.sql_type = type_to_sql(o.type, o.limit, o.precision, o.scale, o.array)
11 12
          super
        end
13 14 15 16 17 18 19

        def add_column_options!(sql, options)
          if options[:collation]
            sql << " COLLATE \"#{options[:collation]}\""
          end
          super
        end
20 21
      end

22 23 24 25 26 27 28 29 30
      module SchemaStatements
        # Drops the database specified on the +name+ attribute
        # and creates it again using the provided +options+.
        def recreate_database(name, options = {}) #:nodoc:
          drop_database(name)
          create_database(name, options)
        end

        # Create a new PostgreSQL database. Options include <tt>:owner</tt>, <tt>:template</tt>,
31
        # <tt>:encoding</tt> (defaults to utf8), <tt>:collation</tt>, <tt>:ctype</tt>,
32 33 34 35 36
        # <tt>:tablespace</tt>, and <tt>:connection_limit</tt> (note that MySQL uses
        # <tt>:charset</tt> while PostgreSQL uses <tt>:encoding</tt>).
        #
        # Example:
        #   create_database config[:database], config
A
AvnerCohen 已提交
37
        #   create_database 'foo_development', encoding: 'unicode'
38
        def create_database(name, options = {})
39
          options = { encoding: "utf8" }.merge!(options.symbolize_keys)
40

41 42
          option_string = options.inject("") do |memo, (key, value)|
            memo += case key
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
            when :owner
              " OWNER = \"#{value}\""
            when :template
              " TEMPLATE = \"#{value}\""
            when :encoding
              " ENCODING = '#{value}'"
            when :collation
              " LC_COLLATE = '#{value}'"
            when :ctype
              " LC_CTYPE = '#{value}'"
            when :tablespace
              " TABLESPACE = \"#{value}\""
            when :connection_limit
              " CONNECTION LIMIT = #{value}"
            else
              ""
            end
          end

          execute "CREATE DATABASE #{quote_table_name(name)}#{option_string}"
        end

        # Drops a PostgreSQL database.
        #
        # Example:
        #   drop_database 'matt_development'
        def drop_database(name) #:nodoc:
          execute "DROP DATABASE IF EXISTS #{quote_table_name(name)}"
        end

73
        # Returns the list of all tables in the schema search path.
74
        def tables(name = nil)
75 76 77 78 79 80
          if name
            ActiveSupport::Deprecation.warn(<<-MSG.squish)
              Passing arguments to #tables is deprecated without replacement.
            MSG
          end

81
          select_values("SELECT tablename FROM pg_tables WHERE schemaname = ANY(current_schemas(false))", "SCHEMA")
82 83
        end

84
        def data_sources # :nodoc
85
          select_values(<<-SQL, "SCHEMA")
86 87 88 89 90 91 92 93
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relkind IN ('r', 'v','m') -- (r)elation/table, (v)iew, (m)aterialized view
            AND n.nspname = ANY (current_schemas(false))
          SQL
        end

94 95 96 97
        # Returns true if table exists.
        # If the schema is not specified as part of +name+ then it will only find tables within
        # the current schema search path (regardless of permissions to access tables in other schemas)
        def table_exists?(name)
98 99 100 101 102 103 104 105 106 107
          ActiveSupport::Deprecation.warn(<<-MSG.squish)
            #table_exists? currently checks both tables and views.
            This behavior is deprecated and will be changed with Rails 5.1 to only check tables.
            Use #data_source_exists? instead.
          MSG

          data_source_exists?(name)
        end

        def data_source_exists?(name)
108 109
          name = Utils.extract_schema_qualified_name(name.to_s)
          return false unless name.identifier
110

111
          select_value(<<-SQL, "SCHEMA").to_i > 0
112 113 114
              SELECT COUNT(*)
              FROM pg_class c
              LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
115
              WHERE c.relkind IN ('r','v','m') -- (r)elation/table, (v)iew, (m)aterialized view
116 117
              AND c.relname = '#{name.identifier}'
              AND n.nspname = #{name.schema ? "'#{name.schema}'" : 'ANY (current_schemas(false))'}
118 119 120
          SQL
        end

121
        def views # :nodoc:
122
          select_values(<<-SQL, "SCHEMA")
123 124 125 126 127 128 129 130 131 132 133 134
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relkind IN ('v','m') -- (v)iew, (m)aterialized view
            AND n.nspname = ANY (current_schemas(false))
          SQL
        end

        def view_exists?(view_name) # :nodoc:
          name = Utils.extract_schema_qualified_name(view_name.to_s)
          return false unless name.identifier

135
          select_values(<<-SQL, "SCHEMA").any?
136 137 138 139 140 141 142 143 144
            SELECT c.relname
            FROM pg_class c
            LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relkind IN ('v','m') -- (v)iew, (m)aterialized view
            AND c.relname = '#{name.identifier}'
            AND n.nspname = #{name.schema ? "'#{name.schema}'" : 'ANY (current_schemas(false))'}
          SQL
        end

145
        def drop_table(table_name, options = {}) # :nodoc:
146
          execute "DROP TABLE#{' IF EXISTS' if options[:if_exists]} #{quote_table_name(table_name)}#{' CASCADE' if options[:force] == :cascade}"
147 148
        end

149 150
        # Returns true if schema exists.
        def schema_exists?(name)
151
          select_value("SELECT COUNT(*) FROM pg_namespace WHERE nspname = '#{name}'", "SCHEMA").to_i > 0
152 153
        end

154
        # Verifies existence of an index with a given name.
155
        def index_name_exists?(table_name, index_name, default)
156 157 158
          table = Utils.extract_schema_qualified_name(table_name.to_s)
          index = Utils.extract_schema_qualified_name(index_name.to_s)

159
          select_value(<<-SQL, "SCHEMA").to_i > 0
160 161 162 163
            SELECT COUNT(*)
            FROM pg_class t
            INNER JOIN pg_index d ON t.oid = d.indrelid
            INNER JOIN pg_class i ON d.indexrelid = i.oid
164
            LEFT JOIN pg_namespace n ON n.oid = i.relnamespace
165
            WHERE i.relkind = 'i'
166 167 168
              AND i.relname = '#{index.identifier}'
              AND t.relname = '#{table.identifier}'
              AND n.nspname = #{index.schema ? "'#{index.schema}'" : 'ANY (current_schemas(false))'}
169 170 171
          SQL
        end

172 173
        # Returns an array of indexes for the given table.
        def indexes(table_name, name = nil)
174 175
          table = Utils.extract_schema_qualified_name(table_name.to_s)

176
          result = query(<<-SQL, "SCHEMA")
177
            SELECT distinct i.relname, d.indisunique, d.indkey, pg_get_indexdef(d.indexrelid), t.oid,
178 179 180 181
                            pg_catalog.obj_description(i.oid, 'pg_class') AS comment,
            (SELECT COUNT(*) FROM pg_opclass o
               JOIN (SELECT unnest(string_to_array(d.indclass::text, ' '))::int oid) c
                 ON o.oid = c.oid WHERE o.opcdefault = 'f')
182 183 184 185 186 187 188 189
            FROM pg_class t
            INNER JOIN pg_index d ON t.oid = d.indrelid
            INNER JOIN pg_class i ON d.indexrelid = i.oid
            LEFT JOIN pg_namespace n ON n.oid = i.relnamespace
            WHERE i.relkind = 'i'
              AND d.indisprimary = 'f'
              AND t.relname = '#{table.identifier}'
              AND n.nspname = #{table.schema ? "'#{table.schema}'" : 'ANY (current_schemas(false))'}
190 191 192 193 194
            ORDER BY i.relname
          SQL

          result.map do |row|
            index_name = row[0]
195 196
            unique = row[1]
            indkey = row[2].split(" ").map(&:to_i)
197 198
            inddef = row[3]
            oid = row[4]
199
            comment = row[5]
200
            opclass = row[6]
201

202
            using, expressions, where = inddef.scan(/ USING (\w+?) \((.+?)\)(?: WHERE (.+))?\z/).flatten
203

204 205 206 207 208 209 210 211 212
            if indkey.include?(0) || opclass > 0
              columns = expressions
            else
              columns = Hash[query(<<-SQL.strip_heredoc, "SCHEMA")].values_at(*indkey).compact
                SELECT a.attnum, a.attname
                FROM pg_attribute a
                WHERE a.attrelid = #{oid}
                AND a.attnum IN (#{indkey.join(",")})
              SQL
213

214
              # add info on sort order for columns (only desc order is explicitly specified, asc is the default)
215 216 217
              orders = Hash[
                expressions.scan(/(\w+) DESC/).flatten.map { |order_column| [order_column, :desc] }
              ]
218
            end
219

220
            IndexDefinition.new(table_name, index_name, unique, columns, [], orders, where, nil, using.to_sym, comment.presence)
221 222 223 224
          end.compact
        end

        # Returns the list of all column definitions for a table.
225 226
        def columns(table_name) # :nodoc:
          table_name = table_name.to_s
227
          column_definitions(table_name).map do |column_name, type, default, notnull, oid, fmod, collation, comment|
228 229
            oid = oid.to_i
            fmod = fmod.to_i
S
Sean Griffin 已提交
230 231
            type_metadata = fetch_type_metadata(column_name, type, oid, fmod)
            default_value = extract_value_from_default(default)
232
            default_function = extract_default_function(default_value, default)
233
            new_column(column_name, default_value, type_metadata, !notnull, table_name, default_function, collation, comment: comment.presence)
234 235 236
          end
        end

237 238
        def new_column(*args) # :nodoc:
          PostgreSQLColumn.new(*args)
239 240
        end

241 242 243 244
        def table_options(table_name) # :nodoc:
          { comment: table_comment(table_name) }
        end

245 246 247
        # Returns a comment stored in database for given table
        def table_comment(table_name) # :nodoc:
          name = Utils.extract_schema_qualified_name(table_name.to_s)
248
          if name.identifier
249
            select_value(<<-SQL.strip_heredoc, "SCHEMA")
250 251 252 253 254 255 256 257
              SELECT pg_catalog.obj_description(c.oid, 'pg_class')
              FROM pg_catalog.pg_class c
                LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
              WHERE c.relname = #{quote(name.identifier)}
                AND c.relkind IN ('r') -- (r)elation/table
                AND n.nspname = #{name.schema ? quote(name.schema) : 'ANY (current_schemas(false))'}
            SQL
          end
258 259
        end

260 261
        # Returns the current database name.
        def current_database
262
          select_value("select current_database()", "SCHEMA")
263 264 265 266
        end

        # Returns the current schema name.
        def current_schema
267
          select_value("SELECT current_schema", "SCHEMA")
268 269 270 271
        end

        # Returns the current database encoding format.
        def encoding
272
          select_value("SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
273 274 275 276
        end

        # Returns the current database collation.
        def collation
277
          select_value("SELECT datcollate FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
278 279 280 281
        end

        # Returns the current database ctype.
        def ctype
282
          select_value("SELECT datctype FROM pg_database WHERE datname LIKE '#{current_database}'", "SCHEMA")
283 284 285 286
        end

        # Returns an array of schema names.
        def schema_names
287
          select_values(<<-SQL, "SCHEMA")
288 289 290 291 292 293 294 295 296 297
            SELECT nspname
              FROM pg_namespace
             WHERE nspname !~ '^pg_.*'
               AND nspname NOT IN ('information_schema')
             ORDER by nspname;
          SQL
        end

        # Creates a schema for the given schema name.
        def create_schema schema_name
298
          execute "CREATE SCHEMA #{quote_schema_name(schema_name)}"
299 300 301
        end

        # Drops the schema for the given schema name.
302
        def drop_schema(schema_name, options = {})
303
          execute "DROP SCHEMA#{' IF EXISTS' if options[:if_exists]} #{quote_schema_name(schema_name)} CASCADE"
304 305 306 307 308 309 310 311 312
        end

        # Sets the schema search path to a string of comma-separated schema names.
        # Names beginning with $ have to be quoted (e.g. $user => '$user').
        # See: http://www.postgresql.org/docs/current/static/ddl-schemas.html
        #
        # This should be not be called manually but set in database.yml.
        def schema_search_path=(schema_csv)
          if schema_csv
313
            execute("SET search_path TO #{schema_csv}", "SCHEMA")
314 315 316 317 318 319
            @schema_search_path = schema_csv
          end
        end

        # Returns the active schema search path.
        def schema_search_path
320
          @schema_search_path ||= select_value("SHOW search_path", "SCHEMA")
321 322 323 324
        end

        # Returns the current client message level.
        def client_min_messages
325
          select_value("SHOW client_min_messages", "SCHEMA")
326 327 328 329
        end

        # Set the client message level.
        def client_min_messages=(level)
330
          execute("SET client_min_messages TO '#{level}'", "SCHEMA")
331 332 333
        end

        # Returns the sequence name for a table's primary key or some other specified key.
334
        def default_sequence_name(table_name, pk = "id") #:nodoc:
B
bogdanvlviv 已提交
335
          result = serial_sequence(table_name, pk)
336
          return nil unless result
337
          Utils.extract_schema_qualified_name(result).to_s
338
        rescue ActiveRecord::StatementInvalid
B
bogdanvlviv 已提交
339
          PostgreSQL::Name.new(nil, "#{table_name}_#{pk}_seq").to_s
340 341 342
        end

        def serial_sequence(table, column)
343
          select_value("SELECT pg_get_serial_sequence('#{table}', '#{column}')", "SCHEMA")
344 345
        end

346
        # Sets the sequence of a table's primary key to the specified value.
A
Aaron Patterson 已提交
347 348
        def set_pk_sequence!(table, value) #:nodoc:
          pk, sequence = pk_and_sequence_for(table)
349 350 351

          if pk
            if sequence
352
              quoted_sequence = quote_table_name(sequence)
353

354
              select_value("SELECT setval('#{quoted_sequence}', #{value})", "SCHEMA")
355
            else
356
              @logger.warn "#{table} has primary key #{pk} with no default sequence." if @logger
357 358 359 360
            end
          end
        end

361 362 363 364 365 366 367 368 369 370
        # Resets the sequence of a table's primary key to the maximum value.
        def reset_pk_sequence!(table, pk = nil, sequence = nil) #:nodoc:
          unless pk and sequence
            default_pk, default_sequence = pk_and_sequence_for(table)

            pk ||= default_pk
            sequence ||= default_sequence
          end

          if @logger && pk && !sequence
371
            @logger.warn "#{table} has primary key #{pk} with no default sequence."
372 373 374 375 376
          end

          if pk && sequence
            quoted_sequence = quote_table_name(sequence)

377
            select_value(<<-end_sql, "SCHEMA")
378 379 380 381 382 383 384 385 386
              SELECT setval('#{quoted_sequence}', (SELECT COALESCE(MAX(#{quote_column_name pk})+(SELECT increment_by FROM #{quoted_sequence}), (SELECT min_value FROM #{quoted_sequence})) FROM #{quote_table_name(table)}), false)
            end_sql
          end
        end

        # Returns a table's primary key and belonging sequence.
        def pk_and_sequence_for(table) #:nodoc:
          # First try looking for a sequence with a dependency on the
          # given table's primary key.
387
          result = query(<<-end_sql, "SCHEMA")[0]
388
            SELECT attr.attname, nsp.nspname, seq.relname
389 390 391
            FROM pg_class      seq,
                 pg_attribute  attr,
                 pg_depend     dep,
392 393
                 pg_constraint cons,
                 pg_namespace  nsp
394 395 396 397 398 399
            WHERE seq.oid           = dep.objid
              AND seq.relkind       = 'S'
              AND attr.attrelid     = dep.refobjid
              AND attr.attnum       = dep.refobjsubid
              AND attr.attrelid     = cons.conrelid
              AND attr.attnum       = cons.conkey[1]
400
              AND seq.relnamespace  = nsp.oid
401
              AND cons.contype      = 'p'
402
              AND dep.classid       = 'pg_class'::regclass
403 404 405 406
              AND dep.refobjid      = '#{quote_table_name(table)}'::regclass
          end_sql

          if result.nil? or result.empty?
407
            result = query(<<-end_sql, "SCHEMA")[0]
408
              SELECT attr.attname, nsp.nspname,
409
                CASE
410
                  WHEN pg_get_expr(def.adbin, def.adrelid) !~* 'nextval' THEN NULL
411 412 413 414
                  WHEN split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2) ~ '.' THEN
                    substr(split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2),
                           strpos(split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2), '.')+1)
                  ELSE split_part(pg_get_expr(def.adbin, def.adrelid), '''', 2)
415 416 417 418 419
                END
              FROM pg_class       t
              JOIN pg_attribute   attr ON (t.oid = attrelid)
              JOIN pg_attrdef     def  ON (adrelid = attrelid AND adnum = attnum)
              JOIN pg_constraint  cons ON (conrelid = adrelid AND adnum = conkey[1])
420
              JOIN pg_namespace   nsp  ON (t.relnamespace = nsp.oid)
421 422
              WHERE t.oid = '#{quote_table_name(table)}'::regclass
                AND cons.contype = 'p'
423
                AND pg_get_expr(def.adbin, def.adrelid) ~* 'nextval|uuid_generate'
424 425 426
            end_sql
          end

427 428 429 430 431 432
          pk = result.shift
          if result.last
            [pk, PostgreSQL::Name.new(*result)]
          else
            [pk, nil]
          end
433 434 435 436
        rescue
          nil
        end

437
        def primary_keys(table_name) # :nodoc:
438
          select_values(<<-SQL.strip_heredoc, "SCHEMA")
439 440 441 442 443 444 445 446 447 448 449
            WITH pk_constraint AS (
              SELECT conrelid, unnest(conkey) AS connum FROM pg_constraint
              WHERE contype = 'p'
                AND conrelid = '#{quote_table_name(table_name)}'::regclass
            ), cons AS (
              SELECT conrelid, connum, row_number() OVER() AS rownum FROM pk_constraint
            )
            SELECT attr.attname FROM pg_attribute attr
            INNER JOIN cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.connum
            ORDER BY cons.rownum
          SQL
450 451 452
        end

        # Renames a table.
453 454
        # Also renames a table's primary key sequence if the sequence name exists and
        # matches the Active Record default.
455 456 457
        #
        # Example:
        #   rename_table('octopuses', 'octopi')
458
        def rename_table(table_name, new_name)
459
          clear_cache!
460
          execute "ALTER TABLE #{quote_table_name(table_name)} RENAME TO #{quote_table_name(new_name)}"
461
          pk, seq = pk_and_sequence_for(new_name)
462
          if seq && seq.identifier == "#{table_name}_#{pk}_seq"
463
            new_seq = "#{new_name}_#{pk}_seq"
464 465
            idx = "#{table_name}_pkey"
            new_idx = "#{new_name}_pkey"
466
            execute "ALTER TABLE #{seq.quoted} RENAME TO #{quote_table_name(new_seq)}"
467
            execute "ALTER INDEX #{quote_table_name(idx)} RENAME TO #{quote_table_name(new_idx)}"
468
          end
469 470

          rename_table_indexes(table_name, new_name)
471 472
        end

T
Tony Miller 已提交
473
        def add_column(table_name, column_name, type, options = {}) #:nodoc:
474
          clear_cache!
475
          super
476
          change_column_comment(table_name, column_name, options[:comment]) if options.key?(:comment)
477 478
        end

479
        def change_column(table_name, column_name, type, options = {}) #:nodoc:
480 481
          clear_cache!
          quoted_table_name = quote_table_name(table_name)
482
          quoted_column_name = quote_column_name(column_name)
483
          sql_type = type_to_sql(type, options[:limit], options[:precision], options[:scale], options[:array])
484
          sql = "ALTER TABLE #{quoted_table_name} ALTER COLUMN #{quoted_column_name} TYPE #{sql_type}"
485 486 487
          if options[:collation]
            sql << " COLLATE \"#{options[:collation]}\""
          end
488 489 490
          if options[:using]
            sql << " USING #{options[:using]}"
          elsif options[:cast_as]
491
            cast_as_type = type_to_sql(options[:cast_as], options[:limit], options[:precision], options[:scale], options[:array])
492
            sql << " USING CAST(#{quoted_column_name} AS #{cast_as_type})"
493
          end
494
          execute sql
495 496 497

          change_column_default(table_name, column_name, options[:default]) if options_include_default?(options)
          change_column_null(table_name, column_name, options[:null], options[:default]) if options.key?(:null)
498
          change_column_comment(table_name, column_name, options[:comment]) if options.key?(:comment)
499 500 501
        end

        # Changes the default value of a table column.
502
        def change_column_default(table_name, column_name, default_or_changes) # :nodoc:
503
          clear_cache!
504
          column = column_for(table_name, column_name)
505
          return unless column
506

507
          default = extract_new_default_value(default_or_changes)
508 509 510 511 512 513
          alter_column_query = "ALTER TABLE #{quote_table_name(table_name)} ALTER COLUMN #{quote_column_name(column_name)} %s"
          if default.nil?
            # <tt>DEFAULT NULL</tt> results in the same behavior as <tt>DROP DEFAULT</tt>. However, PostgreSQL will
            # cast the default to the columns type, which leaves us with a default like "default NULL::character varying".
            execute alter_column_query % "DROP DEFAULT"
          else
514
            execute alter_column_query % "SET DEFAULT #{quote_default_expression(default, column)}"
515
          end
516 517
        end

518
        def change_column_null(table_name, column_name, null, default = nil) #:nodoc:
519 520
          clear_cache!
          unless null || default.nil?
521
            column = column_for(table_name, column_name)
522
            execute("UPDATE #{quote_table_name(table_name)} SET #{quote_column_name(column_name)}=#{quote_default_expression(default, column)} WHERE #{quote_column_name(column_name)} IS NULL") if column
523 524 525 526
          end
          execute("ALTER TABLE #{quote_table_name(table_name)} ALTER #{quote_column_name(column_name)} #{null ? 'DROP' : 'SET'} NOT NULL")
        end

527 528 529 530 531 532 533 534 535 536 537 538
        # Adds comment for given table column or drops it if +comment+ is a +nil+
        def change_column_comment(table_name, column_name, comment) # :nodoc:
          clear_cache!
          execute "COMMENT ON COLUMN #{quote_table_name(table_name)}.#{quote_column_name(column_name)} IS #{quote(comment)}"
        end

        # Adds comment for given table or drops it if +comment+ is a +nil+
        def change_table_comment(table_name, comment) # :nodoc:
          clear_cache!
          execute "COMMENT ON TABLE #{quote_table_name(table_name)} IS #{quote(comment)}"
        end

539
        # Renames a column in a table.
540
        def rename_column(table_name, column_name, new_column_name) #:nodoc:
541 542
          clear_cache!
          execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}"
543
          rename_column_indexes(table_name, column_name, new_column_name)
544 545
        end

D
doabit 已提交
546
        def add_index(table_name, column_name, options = {}) #:nodoc:
547
          index_name, index_type, index_columns, index_options, index_algorithm, index_using, comment = add_index_options(table_name, column_name, options)
548 549 550
          execute("CREATE #{index_type} INDEX #{index_algorithm} #{quote_column_name(index_name)} ON #{quote_table_name(table_name)} #{index_using} (#{index_columns})#{index_options}").tap do
            execute "COMMENT ON INDEX #{quote_column_name(index_name)} IS #{quote(comment)}" if comment
          end
D
doabit 已提交
551 552
        end

553
        def remove_index(table_name, options = {}) #:nodoc:
554 555 556 557 558 559 560 561 562 563 564 565 566 567
          table = Utils.extract_schema_qualified_name(table_name.to_s)

          if options.is_a?(Hash) && options.key?(:name)
            provided_index = Utils.extract_schema_qualified_name(options[:name].to_s)

            options[:name] = provided_index.identifier
            table = PostgreSQL::Name.new(provided_index.schema, table.identifier) unless table.schema.present?

            if provided_index.schema.present? && table.schema != provided_index.schema
              raise ArgumentError.new("Index schema '#{provided_index.schema}' does not match table schema '#{table.schema}'")
            end
          end

          index_to_remove = PostgreSQL::Name.new(table.schema, index_name_for_remove(table.to_s, options))
568
          algorithm =
569
            if options.is_a?(Hash) && options.key?(:algorithm)
570 571 572 573
              index_algorithms.fetch(options[:algorithm]) do
                raise ArgumentError.new("Algorithm must be one of the following: #{index_algorithms.keys.map(&:inspect).join(', ')}")
              end
            end
574
          execute "DROP INDEX #{algorithm} #{quote_table_name(index_to_remove)}"
575 576
        end

577 578
        # Renames an index of a table. Raises error if length of new
        # index name is greater than allowed limit.
579
        def rename_index(table_name, old_name, new_name)
580 581
          validate_index_length!(table_name, new_name)

582 583 584
          execute "ALTER INDEX #{quote_column_name(old_name)} RENAME TO #{quote_table_name(new_name)}"
        end

585
        def foreign_keys(table_name)
586
          fk_info = select_all(<<-SQL.strip_heredoc, "SCHEMA")
587
            SELECT t2.oid::regclass::text AS to_table, a1.attname AS column, a2.attname AS primary_key, c.conname AS name, c.confupdtype AS on_update, c.confdeltype AS on_delete
588 589 590 591 592 593 594 595 596 597
            FROM pg_constraint c
            JOIN pg_class t1 ON c.conrelid = t1.oid
            JOIN pg_class t2 ON c.confrelid = t2.oid
            JOIN pg_attribute a1 ON a1.attnum = c.conkey[1] AND a1.attrelid = t1.oid
            JOIN pg_attribute a2 ON a2.attnum = c.confkey[1] AND a2.attrelid = t2.oid
            JOIN pg_namespace t3 ON c.connamespace = t3.oid
            WHERE c.contype = 'f'
              AND t1.relname = #{quote(table_name)}
              AND t3.nspname = ANY (current_schemas(false))
            ORDER BY c.conname
598 599 600 601
          SQL

          fk_info.map do |row|
            options = {
602 603 604
              column: row["column"],
              name: row["name"],
              primary_key: row["primary_key"]
605 606
            }

607 608
            options[:on_delete] = extract_foreign_key_action(row["on_delete"])
            options[:on_update] = extract_foreign_key_action(row["on_update"])
609

610
            ForeignKeyDefinition.new(table_name, row["to_table"], options)
Y
Yves Senn 已提交
611 612 613 614 615
          end
        end

        def extract_foreign_key_action(specifier) # :nodoc:
          case specifier
616 617 618
          when "c"; :cascade
          when "n"; :nullify
          when "r"; :restrict
619 620 621
          end
        end

622 623 624 625 626
        def index_name_length
          63
        end

        # Maps logical Rails types to PostgreSQL-specific data types.
627 628
        def type_to_sql(type, limit = nil, precision = nil, scale = nil, array = nil)
          sql = case type.to_s
629
          when "binary"
630
            # PostgreSQL doesn't support limits on binary (bytea) columns.
631
            # The hard limit is 1GB, because of a 32-bit size field, and TOAST.
632 633 634 635
            case limit
            when nil, 0..0x3fffffff; super(type)
            else raise(ActiveRecordError, "No binary type has byte size #{limit}.")
            end
636
          when "text"
637
            # PostgreSQL doesn't support limits on text columns.
638
            # The hard limit is 1GB, according to section 8.3 in the manual.
639 640 641 642
            case limit
            when nil, 0..0x3fffffff; super(type)
            else raise(ActiveRecordError, "The limit on text can be at most 1GB - 1byte.")
            end
643
          when "integer"
644
            case limit
645 646 647
            when 1, 2; "smallint"
            when nil, 3, 4; "integer"
            when 5..8; "bigint"
648
            else raise(ActiveRecordError, "No integer type has byte size #{limit}. Use a numeric with scale 0 instead.")
649 650
            end
          else
651
            super(type, limit, precision, scale)
652
          end
653

654
          sql << "[]" if array && type != :primary_key
655
          sql
656 657 658 659
        end

        # PostgreSQL requires the ORDER BY columns in the select list for distinct queries, and
        # requires that the ORDER BY include the distinct column.
660
        def columns_for_distinct(columns, orders) #:nodoc:
661
          order_columns = orders.reject(&:blank?).map{ |s|
662 663 664
              # Convert Arel node to string
              s = s.to_sql unless s.is_a?(String)
              # Remove any ASC/DESC modifiers
665 666
              s.gsub(/\s+(?:ASC|DESC)\b/i, "")
               .gsub(/\s+NULLS\s+(?:FIRST|LAST)\b/i, "")
667 668
            }.reject(&:blank?).map.with_index { |column, i| "#{column} AS alias_#{i}" }

669
          [super, *order_columns].join(", ")
670
        end
S
Sean Griffin 已提交
671 672 673 674 675 676 677 678 679 680 681 682

        def fetch_type_metadata(column_name, sql_type, oid, fmod)
          cast_type = get_oid_type(oid, fmod, column_name, sql_type)
          simple_type = SqlTypeMetadata.new(
            sql_type: sql_type,
            type: cast_type.type,
            limit: cast_type.limit,
            precision: cast_type.precision,
            scale: cast_type.scale,
          )
          PostgreSQLTypeMetadata.new(simple_type, oid: oid, fmod: fmod)
        end
683 684 685 686
      end
    end
  end
end