提交 e74c6ae6 编写于 作者: D Douwe Maan

Merge branch 'refresh-authorizations-fork-join' into 'master'

Fix race conditions for AuthorizedProjectsWorker

Closes #26194 and #26310

See merge request !8701
......@@ -322,7 +322,7 @@ group :test do
gem 'email_spec', '~> 1.6.0'
gem 'json-schema', '~> 2.6.2'
gem 'webmock', '~> 1.21.0'
gem 'test_after_commit', '~> 0.4.2'
gem 'test_after_commit', '~> 1.1'
gem 'sham_rack', '~> 1.3.6'
gem 'timecop', '~> 0.8.0'
end
......
......@@ -760,7 +760,7 @@ GEM
teaspoon-jasmine (2.2.0)
teaspoon (>= 1.0.0)
temple (0.7.7)
test_after_commit (0.4.2)
test_after_commit (1.1.0)
activerecord (>= 3.2)
thin (1.7.0)
daemons (~> 1.0, >= 1.0.9)
......@@ -997,7 +997,7 @@ DEPENDENCIES
sys-filesystem (~> 1.1.6)
teaspoon (~> 1.1.0)
teaspoon-jasmine (~> 2.2.0)
test_after_commit (~> 0.4.2)
test_after_commit (~> 1.1)
thin (~> 1.7.0)
timecop (~> 0.8.0)
truncato (~> 0.7.8)
......
......@@ -68,9 +68,9 @@ class Member < ActiveRecord::Base
after_create :send_request, if: :request?, unless: :importing?
after_create :create_notification_setting, unless: [:pending?, :importing?]
after_create :post_create_hook, unless: [:pending?, :importing?]
after_create :refresh_member_authorized_projects, if: :importing?
after_update :post_update_hook, unless: [:pending?, :importing?]
after_destroy :post_destroy_hook, unless: :pending?
after_commit :refresh_member_authorized_projects
delegate :name, :username, :email, to: :user, prefix: true
......@@ -147,8 +147,6 @@ class Member < ActiveRecord::Base
member.save
end
UserProjectAccessChangedService.new(user.id).execute if user.is_a?(User)
member
end
......@@ -275,23 +273,27 @@ class Member < ActiveRecord::Base
end
def post_create_hook
UserProjectAccessChangedService.new(user.id).execute
system_hook_service.execute_hooks_for(self, :create)
end
def post_update_hook
UserProjectAccessChangedService.new(user.id).execute if access_level_changed?
# override in sub class
end
def post_destroy_hook
refresh_member_authorized_projects
system_hook_service.execute_hooks_for(self, :destroy)
end
# Refreshes authorizations of the current member.
#
# This method schedules a job using Sidekiq and as such **must not** be called
# in a transaction. Doing so can lead to the job running before the
# transaction has been committed, resulting in the job either throwing an
# error or not doing any meaningful work.
def refresh_member_authorized_projects
# If user/source is being destroyed, project access are gonna be destroyed eventually
# because of DB foreign keys, so we shouldn't bother with refreshing after each
# member is destroyed through association
# If user/source is being destroyed, project access are going to be
# destroyed eventually because of DB foreign keys, so we shouldn't bother
# with refreshing after each member is destroyed through association
return if destroyed_by_association.present?
UserProjectAccessChangedService.new(user_id).execute
......
......@@ -16,8 +16,7 @@ class ProjectGroupLink < ActiveRecord::Base
validates :group_access, inclusion: { in: Gitlab::Access.values }, presence: true
validate :different_group
after_create :refresh_group_members_authorized_projects
after_destroy :refresh_group_members_authorized_projects
after_commit :refresh_group_members_authorized_projects
def self.access_options
Gitlab::Access.options
......
......@@ -4,6 +4,6 @@ class UserProjectAccessChangedService
end
def execute
AuthorizedProjectsWorker.bulk_perform_async(@user_ids.map { |id| [id] })
AuthorizedProjectsWorker.bulk_perform_and_wait(@user_ids.map { |id| [id] })
end
end
......@@ -2,6 +2,13 @@ class AuthorizedProjectsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
# Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list)
job_ids = bulk_perform_async(args_list)
Gitlab::JobWaiter.new(job_ids).wait
end
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
......
---
title: Fix race conditions for AuthorizedProjectsWorker
merge_request:
author:
......@@ -12,6 +12,11 @@ Sidekiq.configure_server do |config|
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS']
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0'
chain.add Gitlab::SidekiqStatus::ServerMiddleware
end
config.client_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
# Sidekiq-cron: load recurring jobs from gitlab.yml
......@@ -46,6 +51,10 @@ end
Sidekiq.configure_client do |config|
config.redis = redis_config_hash
config.client_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
end
# The Sidekiq client API always adds the queue to the Sidekiq queue
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
User.seed do |s|
s.id = 1
......
require 'sidekiq/testing'
require './spec/support/sidekiq'
Sidekiq::Testing.inline! do
Gitlab::Seeder.quiet do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
20.times do |i|
begin
......
require 'sidekiq/testing'
require './spec/support/sidekiq'
Sidekiq::Testing.inline! do
Gitlab::Seeder.quiet do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
Project.all.each do |project|
5.times do |i|
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
Project.all.each do |project|
10.times do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
# Limit the number of merge requests per project to avoid long seeds
MAX_NUM_MERGE_REQUESTS = 10
......
Gitlab::Seeder.quiet do
User.first(10).each do |user|
key = "ssh-rsa AAAAB3NzaC1yc2EAAAABJQAAAIEAiPWx6WM4lhHNedGfBpPJNPpZ7yKu+dnn1SJejgt#{user.id + 100}6k6YjzGGphH2TUxwKzxcKDKKezwkpfnxPkSMkuEspGRt/aZZ9wa++Oi7Qkr8prgHc4soW6NUlfDzpvZK2H5E7eQaSeP3SAwGmQKUFHCddNaP0L+hM7zhFNzjFvpaMgJw0="
require './spec/support/sidekiq'
user.keys.create(
title: "Sample key #{user.id}",
key: key
)
# Creating keys runs a gitlab-shell worker. Since we may not have the right
# gitlab-shell path set (yet) we need to disable this for these fixtures.
Sidekiq::Testing.disable! do
Gitlab::Seeder.quiet do
User.first(10).each do |user|
key = "ssh-rsa AAAAB3NzaC1yc2EAAAABJQAAAIEAiPWx6WM4lhHNedGfBpPJNPpZ7yKu+dnn1SJejgt#{user.id + 100}6k6YjzGGphH2TUxwKzxcKDKKezwkpfnxPkSMkuEspGRt/aZZ9wa++Oi7Qkr8prgHc4soW6NUlfDzpvZK2H5E7eQaSeP3SAwGmQKUFHCddNaP0L+hM7zhFNzjFvpaMgJw0="
print '.'
user.keys.create(
title: "Sample key #{user.id}",
key: key
)
print '.'
end
end
end
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
content =<<eos
class Member < ActiveRecord::Base
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
Issue.all.each do |issue|
project = issue.project
......
require './spec/support/sidekiq'
class Gitlab::Seeder::Pipelines
STAGES = %w[build test deploy notify]
BUILDS = [
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
emoji = Gitlab::AwardEmoji.emojis.keys
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do
admin_user = User.find(1)
......
require 'sidekiq/testing'
require './spec/support/sidekiq'
require './spec/support/test_env'
class Gitlab::Seeder::CycleAnalytics
......
......@@ -4,7 +4,6 @@ SimpleCovEnv.start!
ENV['RAILS_ENV'] = 'test'
require './config/environment'
require 'rspec/expectations'
require 'sidekiq/testing/inline'
require_relative 'capybara'
require_relative 'db_cleaner'
......@@ -15,7 +14,7 @@ if ENV['CI']
Knapsack::Adapters::SpinachAdapter.bind
end
%w(select2_helper test_env repo_helpers wait_for_ajax).each do |f|
%w(select2_helper test_env repo_helpers wait_for_ajax sidekiq).each do |f|
require Rails.root.join('spec', 'support', f)
end
......
module Gitlab
# JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
class JobWaiter
# The sleep interval between checking keys, in seconds.
INTERVAL = 0.1
# jobs - The job IDs to wait for.
def initialize(jobs)
@jobs = jobs
end
# Waits for all the jobs to be completed.
#
# timeout - The maximum amount of seconds to block the caller for. This
# ensures we don't indefinitely block a caller in case a job takes
# long to process, or is never processed.
def wait(timeout = 60)
start = Time.current
while (Time.current - start) <= timeout
break if SidekiqStatus.all_completed?(@jobs)
sleep(INTERVAL) # to not overload Redis too much.
end
end
end
end
module Gitlab
# The SidekiqStatus module and its child classes can be used for checking if a
# Sidekiq job has been processed or not.
#
# To check if a job has been completed, simply pass the job ID to the
# `completed?` method:
#
# job_id = SomeWorker.perform_async(...)
#
# if Gitlab::SidekiqStatus.completed?(job_id)
# ...
# end
#
# For each job ID registered a separate key is stored in Redis, making lookups
# much faster than using Sidekiq's built-in job finding/status API. These keys
# expire after a certain period of time to prevent storing too many keys in
# Redis.
module SidekiqStatus
STATUS_KEY = 'gitlab-sidekiq-status:%s'.freeze
# The default time (in seconds) after which a status key is expired
# automatically. The default of 30 minutes should be more than sufficient
# for most jobs.
DEFAULT_EXPIRATION = 30.minutes.to_i
# Starts tracking of the given job.
#
# jid - The Sidekiq job ID
# expire - The expiration time of the Redis key.
def self.set(jid, expire = DEFAULT_EXPIRATION)
Sidekiq.redis do |redis|
redis.set(key_for(jid), 1, ex: expire)
end
end
# Stops the tracking of the given job.
#
# jid - The Sidekiq job ID to remove.
def self.unset(jid)
Sidekiq.redis do |redis|
redis.del(key_for(jid))
end
end
# Returns true if all the given job have been completed.
#
# jids - The Sidekiq job IDs to check.
#
# Returns true or false.
def self.all_completed?(jids)
keys = jids.map { |jid| key_for(jid) }
responses = Sidekiq.redis do |redis|
redis.pipelined do
keys.each { |key| redis.exists(key) }
end
end
responses.all? { |value| !value }
end
def self.key_for(jid)
STATUS_KEY % jid
end
end
end
module Gitlab
module SidekiqStatus
class ClientMiddleware
def call(_, job, _, _)
SidekiqStatus.set(job['jid'])
yield
end
end
end
end
module Gitlab
module SidekiqStatus
class ServerMiddleware
def call(worker, job, queue)
ret = yield
SidekiqStatus.unset(job['jid'])
ret
end
end
end
end
require 'spec_helper'
describe Gitlab::JobWaiter do
describe '#wait' do
let(:waiter) { described_class.new(%w(a)) }
it 'returns when all jobs have been completed' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a)).
and_return(true)
expect(waiter).not_to receive(:sleep)
waiter.wait
end
it 'sleeps between checking the job statuses' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).
with(%w(a)).
and_return(false, true)
expect(waiter).to receive(:sleep).with(described_class::INTERVAL)
waiter.wait
end
it 'returns when timing out' do
expect(waiter).not_to receive(:sleep)
waiter.wait(0)
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus::ClientMiddleware do
describe '#call' do
it 'tracks the job in Redis' do
expect(Gitlab::SidekiqStatus).to receive(:set).with('123')
described_class.new.
call('Foo', { 'jid' => '123' }, double(:queue), double(:pool)) { nil }
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus::ServerMiddleware do
describe '#call' do
it 'stops tracking of a job upon completion' do
expect(Gitlab::SidekiqStatus).to receive(:unset).with('123')
ret = described_class.new.
call(double(:worker), { 'jid' => '123' }, double(:queue)) { 10 }
expect(ret).to eq(10)
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus do
describe '.set', :redis do
it 'stores the job ID' do
described_class.set('123')
key = described_class.key_for('123')
Sidekiq.redis do |redis|
expect(redis.exists(key)).to eq(true)
expect(redis.ttl(key) > 0).to eq(true)
end
end
end
describe '.unset', :redis do
it 'removes the job ID' do
described_class.set('123')
described_class.unset('123')
key = described_class.key_for('123')
Sidekiq.redis do |redis|
expect(redis.exists(key)).to eq(false)
end
end
end
describe '.all_completed?', :redis do
it 'returns true if all jobs have been completed' do
expect(described_class.all_completed?(%w(123))).to eq(true)
end
it 'returns false if a job has not yet been completed' do
described_class.set('123')
expect(described_class.all_completed?(%w(123 456))).to eq(false)
end
end
describe '.key_for' do
it 'returns the key for a job ID' do
key = described_class.key_for('123')
expect(key).to be_an_instance_of(String)
expect(key).to include('123')
end
end
end
require 'spec_helper'
describe UserProjectAccessChangedService do
describe '#execute' do
it 'schedules the user IDs' do
expect(AuthorizedProjectsWorker).to receive(:bulk_perform_and_wait).
with([[1], [2]])
described_class.new([1, 2]).execute
end
end
end
......@@ -7,7 +7,6 @@ ENV["IN_MEMORY_APPLICATION_SETTINGS"] = 'true'
require File.expand_path("../../config/environment", __FILE__)
require 'rspec/rails'
require 'shoulda/matchers'
require 'sidekiq/testing/inline'
require 'rspec/retry'
if ENV['CI'] && !ENV['NO_KNAPSACK']
......
require 'sidekiq/testing/inline'
Sidekiq::Testing.server_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ServerMiddleware
end
......@@ -3,6 +3,18 @@ require 'spec_helper'
describe AuthorizedProjectsWorker do
let(:worker) { described_class.new }
describe '.bulk_perform_and_wait' do
it 'schedules the ids and waits for the jobs to complete' do
project = create(:project)
project.owner.project_authorizations.delete_all
described_class.bulk_perform_and_wait([[project.owner.id]])
expect(project.owner.project_authorizations.count).to eq(1)
end
end
describe '#perform' do
it "refreshes user's authorized projects" do
user = create(:user)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册