提交 e06e64ef 编写于 作者: D David Heinemeier Hansson

Merge pull request #9 from cristianbica/delayed_job_adapter

Implemented delayed_job support
......@@ -4,4 +4,5 @@ gemspec
gem 'resque'
gem 'sidekiq'
gem 'sucker_punch'
\ No newline at end of file
gem 'sucker_punch'
gem 'delayed_job'
......@@ -16,6 +16,8 @@ GEM
celluloid (0.15.2)
timers (~> 1.1.0)
connection_pool (2.0.0)
delayed_job (4.0.1)
activesupport (>= 3.0, < 4.2)
i18n (0.6.9)
json (1.8.1)
minitest (5.3.4)
......@@ -58,6 +60,7 @@ PLATFORMS
DEPENDENCIES
activejob!
delayed_job
resque
sidekiq
sucker_punch
......@@ -18,10 +18,10 @@ We currently have adapters for:
* Resque 1.x
* Sidekiq
* Sucker Punch
* Delayed Job
We would like to have adapters for:
* Delayed Job
* beanstalkd
* rabbitmq
......
......@@ -22,12 +22,12 @@ task :default => :test
desc 'Run all adapter tests'
task :test do
tasks = %w(test_inline test_resque test_sidekiq test_sucker_punch)
tasks = %w(test_inline test_resque test_sidekiq test_sucker_punch test_delayed_job)
run_without_aborting(*tasks)
end
%w( inline resque sidekiq sucker_punch).each do |adapter|
%w( inline resque sidekiq sucker_punch delayed_job).each do |adapter|
Rake::TestTask.new("test_#{adapter}") do |t|
t.libs << 'test'
t.test_files = FileList['test/cases/**/*_test.rb']
......
require 'delayed_job'
module ActiveJob
module QueueAdapters
class DelayedJobAdapter
class << self
def queue(job, *args)
job.delay(queue: job.queue_name).perform(*args)
end
end
end
end
end
require 'delayed_job'
$LOAD_PATH << File.dirname(__FILE__) + "/../support/delayed_job"
Delayed::Worker.delay_jobs = false
Delayed::Worker.backend = :test
ActiveJob::Base.adapter = :delayed_job
......@@ -25,6 +25,11 @@ def setup
assert_equal ActiveJob::QueueAdapters::SuckerPunchAdapter, ActiveJob::Base.queue_adapter
end
test 'should load delayed_job adapter' do
ActiveJob::Base.adapter = :delayed_job
assert_equal ActiveJob::QueueAdapters::DelayedJobAdapter, ActiveJob::Base.queue_adapter
end
def teardown
ActiveJob::Base.queue_adapter = @old_adapter
end
......
require 'bundler'
Bundler.setup
$LOAD_PATH << File.dirname(__FILE__ + "/../lib")
$LOAD_PATH << File.dirname(__FILE__) + "/../lib"
require 'active_job'
require "adapters/#{ENV['AJADAPTER'] || 'inline'}"
......
#copied from https://github.com/collectiveidea/delayed_job/blob/master/spec/delayed/backend/test.rb
require 'ostruct'
# An in-memory backend suitable only for testing. Tries to behave as if it were an ORM.
module Delayed
module Backend
module Test
class Job
attr_accessor :id
attr_accessor :priority
attr_accessor :attempts
attr_accessor :handler
attr_accessor :last_error
attr_accessor :run_at
attr_accessor :locked_at
attr_accessor :locked_by
attr_accessor :failed_at
attr_accessor :queue
include Delayed::Backend::Base
cattr_accessor :id
self.id = 0
def initialize(hash = {})
self.attempts = 0
self.priority = 0
self.id = (self.class.id += 1)
hash.each{|k,v| send(:"#{k}=", v)}
end
@jobs = []
def self.all
@jobs
end
def self.count
all.size
end
def self.delete_all
all.clear
end
def self.create(attrs = {})
new(attrs).tap do |o|
o.save
end
end
def self.create!(*args); create(*args); end
def self.clear_locks!(worker_name)
all.select{|j| j.locked_by == worker_name}.each {|j| j.locked_by = nil; j.locked_at = nil}
end
# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
jobs = all.select do |j|
j.run_at <= db_time_now &&
(j.locked_at.nil? || j.locked_at < db_time_now - max_run_time || j.locked_by == worker_name) &&
!j.failed?
end
jobs = jobs.select{|j| Worker.queues.include?(j.queue)} if Worker.queues.any?
jobs = jobs.select{|j| j.priority >= Worker.min_priority} if Worker.min_priority
jobs = jobs.select{|j| j.priority <= Worker.max_priority} if Worker.max_priority
jobs.sort_by{|j| [j.priority, j.run_at]}[0..limit-1]
end
# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker)
now = self.class.db_time_now
if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.locked_at = now
self.locked_by = worker
end
return true
end
def self.db_time_now
Time.current
end
def update_attributes(attrs = {})
attrs.each{|k,v| send(:"#{k}=", v)}
save
end
def destroy
self.class.all.delete(self)
end
def save
self.run_at ||= Time.current
self.class.all << self unless self.class.all.include?(self)
true
end
def save!; save; end
def reload
reset
self
end
end
end
end
end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册