20161124141322_migrate_process_commit_worker_jobs.rb 2.6 KB
Newer Older
1 2 3 4 5 6 7 8
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.

class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration
  include Gitlab::Database::MigrationHelpers

  class Project < ActiveRecord::Base
    def self.find_including_path(id)
9 10 11
      select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace").
        joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id').
        find_by(id: id)
12 13 14
    end

    def repository_storage_path
15
      Gitlab.config.repositories.storages[repository_storage]['path']
16 17 18 19 20 21 22 23 24 25 26 27
    end

    def repository_path
      File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git')
    end

    def repository
      @repository ||= Rugged::Repository.new(repository_path)
    end
  end

  DOWNTIME = true
28
  DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'
29 30 31 32 33 34 35 36

  disable_ddl_transaction!

  def up
    Sidekiq.redis do |redis|
      new_jobs = []

      while job = redis.lpop('queue:process_commit')
D
Douwe Maan 已提交
37
        payload = JSON.parse(job)
38 39 40 41 42 43 44 45 46 47 48 49
        project = Project.find_including_path(payload['args'][0])

        next unless project

        begin
          commit = project.repository.lookup(payload['args'][2])
        rescue Rugged::OdbError
          next
        end

        hash = {
          id: commit.oid,
50
          message: encode(commit.message),
51 52
          parent_ids: commit.parent_ids,
          authored_date: commit.author[:time],
53 54
          author_name: encode(commit.author[:name]),
          author_email: encode(commit.author[:email]),
55
          committed_date: commit.committer[:time],
56 57
          committer_email: encode(commit.committer[:email]),
          committer_name: encode(commit.committer[:name])
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
        }

        payload['args'][2] = hash

        new_jobs << JSON.dump(payload)
      end

      redis.multi do |multi|
        new_jobs.each do |j|
          multi.lpush('queue:process_commit', j)
        end
      end
    end
  end

  def down
    Sidekiq.redis do |redis|
      new_jobs = []

      while job = redis.lpop('queue:process_commit')
D
Douwe Maan 已提交
78
        payload = JSON.parse(job)
79 80 81 82 83 84 85 86 87 88 89 90 91

        payload['args'][2] = payload['args'][2]['id']

        new_jobs << JSON.dump(payload)
      end

      redis.multi do |multi|
        new_jobs.each do |j|
          multi.lpush('queue:process_commit', j)
        end
      end
    end
  end
92 93 94 95 96 97 98 99 100 101

  def encode(data)
    encoding = Encoding::UTF_8

    if data.encoding == encoding
      data
    else
      data.encode(encoding, invalid: :replace, undef: :replace)
    end
  end
102
end