提交 fc0ff928 编写于 作者: G Gabriel Mazetto

Added Rollbacker workers and support on the rake task

Rollback is done similar to Migration for the Hashed Storage.
It also shares the same ExclusiveLease key to prevent both happening
at the same time.

All Hashed Storage related workers now share the same queue namespace
which allows for assigning dedicated workers easily.
上级 d63380fa
......@@ -5,13 +5,21 @@ module Projects
AttachmentMigrationError = Class.new(StandardError)
class MigrateAttachmentsService < BaseService
attr_reader :logger, :old_disk_path, :new_disk_path
# Returns the disk_path value before the execution
# This is used in EE for Geo
attr_reader :old_disk_path
# Returns the diks_path value after the execution
# This is used in EE for Geo
attr_reader :new_disk_path
# Returns the logger currently in use
attr_reader :logger
def initialize(project, old_disk_path, logger: nil)
@project = project
@logger = logger || Rails.logger
@old_disk_path = old_disk_path
@new_disk_path = project.disk_path
@skipped = false
end
......@@ -23,6 +31,8 @@ module Projects
project.storage_version = ::Project::HASHED_STORAGE_FEATURES[:attachments]
target = FileUploader.absolute_base_dir(project)
@new_disk_path = project.disk_path
result = move_folder!(origin, target)
project.save!
......@@ -33,6 +43,10 @@ module Projects
result
end
# Return whether this operation was skipped or not
# This is used in EE for Geo to decide if an event will be triggered or not
#
# @return [Boolean] true if skipped of false otherwise
def skipped?
@skipped
end
......@@ -43,12 +57,13 @@ module Projects
unless File.directory?(old_path)
logger.info("Skipped attachments migration from '#{old_path}' to '#{new_path}', source path doesn't exist or is not a directory (PROJECT_ID=#{project.id})")
@skipped = true
return true
end
if File.exist?(new_path)
logger.error("Cannot migrate attachments from '#{old_path}' to '#{new_path}', target path already exist (PROJECT_ID=#{project.id})")
raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
raise AttachmentMigrationError, "Target path '#{new_path}' already exists"
end
# Create hashed storage base path folder
......
......@@ -15,7 +15,7 @@ module Projects
result = move_repository(old_disk_path, new_disk_path)
if move_wiki
result &&= move_repository("#{old_wiki_disk_path}", "#{new_disk_path}.wiki")
result &&= move_repository(old_wiki_disk_path, "#{new_disk_path}.wiki")
end
if result
......
......@@ -5,11 +5,21 @@ module Projects
AttachmentRollbackError = Class.new(StandardError)
class RollbackAttachmentsService < BaseService
attr_reader :logger, :old_disk_path
# Returns the disk_path value before the execution
# This is used in EE for Geo
attr_reader :old_disk_path
# Returns the diks_path value after the execution
# This is used in EE for Geo
attr_reader :new_disk_path
# Returns the logger currently in use
attr_reader :logger
def initialize(project, logger: nil)
@project = project
@logger = logger || Rails.logger
@old_disk_path = project.disk_path
end
def execute
......@@ -17,6 +27,8 @@ module Projects
project.storage_version = ::Project::HASHED_STORAGE_FEATURES[:repository]
target = FileUploader.absolute_base_dir(project)
@new_disk_path = FileUploader.base_dir(project)
result = move_folder!(origin, target)
project.save!
......@@ -27,24 +39,34 @@ module Projects
result
end
# Return whether this operation was skipped or not
# This is used in EE for Geo to decide if an event will be triggered or not
#
# @return [Boolean] true if skipped of false otherwise
def skipped?
@skipped
end
private
def move_folder!(old_path, new_path)
unless File.directory?(old_path)
logger.info("Skipped attachments rollback from '#{old_path}' to '#{new_path}', source path doesn't exist or is not a directory (PROJECT_ID=#{project.id})")
@skipped = true
return true
end
if File.exist?(new_path)
logger.error("Cannot rollback attachments from '#{old_path}' to '#{new_path}', target path already exist (PROJECT_ID=#{project.id})")
raise AttachmentRollbackError, "Target path '#{new_path}' already exist"
raise AttachmentRollbackError, "Target path '#{new_path}' already exists"
end
# Create hashed storage base path folder
FileUtils.mkdir_p(File.dirname(new_path))
FileUtils.mv(old_path, new_path)
logger.info("Rolledback project attachments from '#{old_path}' to '#{new_path}' (PROJECT_ID=#{project.id})")
logger.info("Rolled project attachments back from '#{old_path}' to '#{new_path}' (PROJECT_ID=#{project.id})")
true
end
......
......@@ -15,7 +15,7 @@ module Projects
result = move_repository(old_disk_path, new_disk_path)
if move_wiki
result &&= move_repository("#{old_wiki_disk_path}", "#{new_disk_path}.wiki")
result &&= move_repository(old_wiki_disk_path, "#{new_disk_path}.wiki")
end
if result
......
......@@ -47,6 +47,7 @@
- github_importer:github_import_stage_import_repository
- hashed_storage:hashed_storage_migrator
- hashed_storage:hashed_storage_rollbacker
- hashed_storage:hashed_storage_project_migrate
- hashed_storage:hashed_storage_project_rollback
......
......@@ -14,8 +14,8 @@ module HashedStorage
uuid = lease_for(project_id).try_obtain
if uuid
project = Project.find_by(id: project_id)
return if project.nil? || project.pending_delete?
project = Project.without_deleted.find_by(id: project_id)
return unless project
old_disk_path ||= project.disk_path
......
......@@ -13,8 +13,8 @@ module HashedStorage
uuid = lease_for(project_id).try_obtain
if uuid
project = Project.find_by(id: project_id)
return if project.nil? || project.pending_delete?
project = Project.without_deleted.find_by(id: project_id)
return unless project
old_disk_path ||= project.disk_path
......
# frozen_string_literal: true
module HashedStorage
class RollbackerWorker
include ApplicationWorker
queue_namespace :hashed_storage
# @param [Integer] start initial ID of the batch
# @param [Integer] finish last ID of the batch
def perform(start, finish)
migrator = Gitlab::HashedStorage::Migrator.new
migrator.bulk_rollback(start: start, finish: finish)
end
end
end
......@@ -13,10 +13,18 @@ module Gitlab
#
# @param [Integer] start first project id for the range
# @param [Integer] finish last project id for the range
def bulk_schedule(start:, finish:)
def bulk_schedule_migration(start:, finish:)
::HashedStorage::MigratorWorker.perform_async(start, finish)
end
# Schedule a range of projects to be bulk rolledback with #bulk_rollback asynchronously
#
# @param [Integer] start first project id for the range
# @param [Integer] finish last project id for the range
def bulk_schedule_rollback(start:, finish:)
::HashedStorage::RollbackerWorker.perform_async(start, finish)
end
# Start migration of projects from specified range
#
# Flagging a project to be migrated is a synchronous action
......@@ -34,6 +42,23 @@ module Gitlab
end
# rubocop: enable CodeReuse/ActiveRecord
# Start rollback of projects from specified range
#
# Flagging a project to be rolled back is a synchronous action
# but the rollback runs through async jobs
#
# @param [Integer] start first project id for the range
# @param [Integer] finish last project id for the range
# rubocop: disable CodeReuse/ActiveRecord
def bulk_rollback(start:, finish:)
projects = build_relation(start, finish)
projects.with_route.find_each(batch_size: BATCH_SIZE) do |project|
rollback(project)
end
end
# rubocop: enable CodeReuse/ActiveRecord
# Flag a project to be migrated to Hashed Storage
#
# @param [Project] project that will be migrated
......
......@@ -24,7 +24,7 @@ module Gitlab
end
# rubocop: disable CodeReuse/ActiveRecord
def self.project_id_batches(&block)
def self.project_id_batches_migration(&block)
Project.with_unmigrated_storage.in_batches(of: batch_size, start: range_from, finish: range_to) do |relation| # rubocop: disable Cop/InBatches
ids = relation.pluck(:id)
......@@ -33,6 +33,16 @@ module Gitlab
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def self.project_id_batches_rollback(&block)
Project.with_storage_feature(:repository).in_batches(of: batch_size, start: range_from, finish: range_to) do |relation| # rubocop: disable Cop/InBatches
ids = relation.pluck(:id)
yield ids.min, ids.max
end
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def self.legacy_attachments_relation
Upload.joins(<<~SQL).where('projects.storage_version < :version OR projects.storage_version IS NULL', version: Project::HASHED_STORAGE_FEATURES[:attachments])
......
......@@ -36,8 +36,8 @@ namespace :gitlab do
print "Enqueuing migration of #{legacy_projects_count} projects in batches of #{helper.batch_size}"
helper.project_id_batches do |start, finish|
storage_migrator.bulk_schedule(start: start, finish: finish)
helper.project_id_batches_migration do |start, finish|
storage_migrator.bulk_schedule_migration(start: start, finish: finish)
print '.'
end
......@@ -81,8 +81,9 @@ namespace :gitlab do
print "Enqueuing rollback of #{hashed_projects_count} projects in batches of #{helper.batch_size}"
helper.project_id_batches do |start, finish|
storage_migrator.bulk_schedule(start: start, finish: finish, operation: :rollback)
helper.project_id_batches_rollback do |start, finish|
puts "Start: #{start} FINISH: #{finish}"
storage_migrator.bulk_schedule_rollback(start: start, finish: finish)
print '.'
end
......
require 'spec_helper'
describe Gitlab::HashedStorage::Migrator do
describe '#bulk_schedule' do
it 'schedules job to StorageMigratorWorker' do
describe '#bulk_schedule_migration' do
it 'schedules job to HashedStorage::MigratorWorker' do
Sidekiq::Testing.fake! do
expect { subject.bulk_schedule(start: 1, finish: 5) }.to change(HashedStorage::MigratorWorker.jobs, :size).by(1)
expect { subject.bulk_schedule_migration(start: 1, finish: 5) }.to change(HashedStorage::MigratorWorker.jobs, :size).by(1)
end
end
end
describe '#bulk_schedule_rollback' do
it 'schedules job to HashedStorage::RollbackerWorker' do
Sidekiq::Testing.fake! do
expect { subject.bulk_schedule_rollback(start: 1, finish: 5) }.to change(HashedStorage::RollbackerWorker.jobs, :size).by(1)
end
end
end
describe '#bulk_migrate' do
let(:projects) { create_list(:project, 2, :legacy_storage) }
let(:projects) { create_list(:project, 2, :legacy_storage, :empty_repo) }
let(:ids) { projects.map(&:id) }
it 'enqueue jobs to HashedStorage::ProjectMigrateWorker' do
......@@ -32,13 +40,53 @@ describe Gitlab::HashedStorage::Migrator do
subject.bulk_migrate(start: ids.min, finish: ids.max)
end
it 'has migrated projects set as writable' do
it 'has all projects migrated and set as writable' do
perform_enqueued_jobs do
subject.bulk_migrate(start: ids.min, finish: ids.max)
end
projects.each do |project|
expect(project.reload.repository_read_only?).to be_falsey
project.reload
expect(project.hashed_storage?(:repository)).to be_truthy
expect(project.repository_read_only?).to be_falsey
end
end
end
describe '#bulk_rollback' do
let(:projects) { create_list(:project, 2, :empty_repo) }
let(:ids) { projects.map(&:id) }
it 'enqueue jobs to HashedStorage::ProjectRollbackWorker' do
Sidekiq::Testing.fake! do
expect { subject.bulk_rollback(start: ids.min, finish: ids.max) }.to change(HashedStorage::ProjectRollbackWorker.jobs, :size).by(2)
end
end
it 'rescues and log exceptions' do
allow_any_instance_of(Project).to receive(:rollback_to_legacy_storage!).and_raise(StandardError)
expect { subject.bulk_rollback(start: ids.min, finish: ids.max) }.not_to raise_error
end
it 'delegates each project in specified range to #rollback' do
projects.each do |project|
expect(subject).to receive(:rollback).with(project)
end
subject.bulk_rollback(start: ids.min, finish: ids.max)
end
it 'has all projects rolledback and set as writable' do
perform_enqueued_jobs do
subject.bulk_rollback(start: ids.min, finish: ids.max)
end
projects.each do |project|
project.reload
expect(project.legacy_storage?).to be_truthy
expect(project.repository_read_only?).to be_falsey
end
end
end
......
......@@ -86,6 +86,8 @@ describe Projects::HashedStorage::MigrateAttachmentsService do
context '#new_disk_path' do
it 'returns new disk_path for project' do
service.execute
expect(service.new_disk_path).to eq(project.disk_path)
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Projects::HashedStorage::RollbackAttachmentsService do
subject(:service) { described_class.new(project, logger: nil) }
let(:project) { create(:project, :repository, skip_disk_validation: true) }
let(:legacy_storage) { Storage::LegacyProject.new(project) }
let(:hashed_storage) { Storage::HashedProject.new(project) }
let!(:upload) { Upload.find_by(path: file_uploader.upload_path) }
let(:file_uploader) { build(:file_uploader, project: project) }
let(:old_disk_path) { File.join(base_path(hashed_storage), upload.path) }
let(:new_disk_path) { File.join(base_path(legacy_storage), upload.path) }
context '#execute' do
context 'when succeeds' do
it 'moves attachments to legacy storage layout' do
expect(File.file?(old_disk_path)).to be_truthy
expect(File.file?(new_disk_path)).to be_falsey
expect(File.exist?(base_path(hashed_storage))).to be_truthy
expect(File.exist?(base_path(legacy_storage))).to be_falsey
expect(FileUtils).to receive(:mv).with(base_path(hashed_storage), base_path(legacy_storage)).and_call_original
service.execute
expect(File.exist?(base_path(legacy_storage))).to be_truthy
expect(File.exist?(base_path(hashed_storage))).to be_falsey
expect(File.file?(old_disk_path)).to be_falsey
expect(File.file?(new_disk_path)).to be_truthy
end
it 'returns true' do
expect(service.execute).to be_truthy
end
it 'sets skipped to false' do
service.execute
expect(service.skipped?).to be_falsey
end
end
context 'when original folder does not exist anymore' do
before do
FileUtils.rm_rf(base_path(hashed_storage))
end
it 'skips moving folders and go to next' do
expect(FileUtils).not_to receive(:mv).with(base_path(hashed_storage), base_path(legacy_storage))
service.execute
expect(File.exist?(base_path(legacy_storage))).to be_falsey
expect(File.file?(new_disk_path)).to be_falsey
end
it 'returns true' do
expect(service.execute).to be_truthy
end
it 'sets skipped to true' do
service.execute
expect(service.skipped?).to be_truthy
end
end
context 'when target folder already exists' do
before do
FileUtils.mkdir_p(base_path(legacy_storage))
end
it 'raises AttachmentRollbackError' do
expect(FileUtils).not_to receive(:mv).with(base_path(legacy_storage), base_path(hashed_storage))
expect { service.execute }.to raise_error(Projects::HashedStorage::AttachmentRollbackError)
end
end
end
context '#old_disk_path' do
it 'returns old disk_path for project' do
expect(service.old_disk_path).to eq(project.disk_path)
end
end
context '#new_disk_path' do
it 'returns new disk_path for project' do
service.execute
expect(service.new_disk_path).to eq(project.full_path)
end
end
def base_path(storage)
File.join(FileUploader.root, storage.disk_path)
end
end
......@@ -66,7 +66,7 @@ describe Projects::HashedStorage::RollbackRepositoryService, :clean_gitlab_redis
end
context 'when one move fails' do
it 'rollsback repositories to original name' do
it 'rolls repositories back to original name' do
allow(service).to receive(:move_repository).and_call_original
allow(service).to receive(:move_repository).with(old_disk_path, new_disk_path).once { false } # will disable first move only
......
# frozen_string_literal: true
require 'spec_helper'
describe HashedStorage::RollbackerWorker do
subject(:worker) { described_class.new }
let(:projects) { create_list(:project, 2, :empty_repo) }
let(:ids) { projects.map(&:id) }
describe '#perform' do
it 'delegates to MigratorService' do
expect_any_instance_of(Gitlab::HashedStorage::Migrator).to receive(:bulk_rollback).with(start: 5, finish: 10)
worker.perform(5, 10)
end
it 'rollsback projects in the specified range' do
perform_enqueued_jobs do
worker.perform(ids.min, ids.max)
end
projects.each do |project|
expect(project.reload.legacy_storage?).to be_truthy
end
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册