提交 7059ab35 编写于 作者: W wvengen

Add job priorities to ActiveJob

上级 61f9e47f
* Add job priorities to Active Job.
*wvengen*
* Implement a simple `AsyncJob` processor and associated `AsyncAdapter` that
queue jobs to a `concurrent-ruby` thread pool.
......
require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
require 'active_job/queue_priority'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
......@@ -57,6 +58,7 @@ class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
......
......@@ -18,6 +18,9 @@ module Core
# Queue in which the job will reside.
attr_writer :queue_name
# Priority that the job will have (lower is more priority).
attr_writer :priority
# ID optionally provided by adapter
attr_accessor :provider_job_id
......@@ -43,6 +46,7 @@ def deserialize(job_data)
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
......@@ -51,6 +55,7 @@ def deserialize(job_data)
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
def set(options={})
ConfiguredJob.new(self, options)
end
......@@ -62,6 +67,7 @@ def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
end
# Returns a hash with the job data that can safely be passed to the
......@@ -71,6 +77,7 @@ def serialize
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
'priority' => priority,
'arguments' => serialize_arguments(arguments),
'locale' => I18n.locale
}
......@@ -99,6 +106,7 @@ def serialize
def deserialize(job_data)
self.job_id = job_data['job_id']
self.queue_name = job_data['queue_name']
self.priority = job_data['priority']
self.serialized_arguments = job_data['arguments']
self.locale = job_data['locale'] || I18n.locale
end
......
......@@ -32,6 +32,7 @@ def job_or_instantiate(*args)
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
......@@ -54,6 +55,7 @@ def retry_job(options={})
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
......@@ -61,10 +63,12 @@ def retry_job(options={})
# my_job_instance.enqueue wait: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
# my_job_instance.enqueue priority: 10
def enqueue(options={})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
......
......@@ -14,13 +14,13 @@ module QueueAdapters
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
def enqueue(job) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
......
......@@ -16,13 +16,13 @@ module QueueAdapters
# Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
def enqueue(job) #:nodoc:
que_job = JobWrapper.enqueue job.serialize
que_job = JobWrapper.enqueue job.serialize, priority: job.priority
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
def enqueue_at(job, timestamp) #:nodoc:
que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp)
que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp)
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
......
module ActiveJob
module QueuePriority
extend ActiveSupport::Concern
# Includes the ability to override the default queue priority.
module ClassMethods
mattr_accessor(:default_priority)
# Specifies the priority of the queue to create the job with.
#
# class PublishToFeedJob < ActiveJob::Base
# queue_with_priority 50
#
# def perform(post)
# post.to_feed!
# end
# end
#
# Specify either an argument or a block.
def queue_with_priority(priority=nil, &block)
if block_given?
self.priority = block
else
self.priority = priority
end
end
end
included do
class_attribute :priority, instance_accessor: false
self.priority = default_priority
end
# Returns the priority that the job will be created with
def priority
if @priority.is_a?(Proc)
@priority = instance_exec(&@priority)
end
@priority
end
end
end
require 'helper'
require 'jobs/hello_job'
class QueuePriorityTest < ActiveSupport::TestCase
test 'priority unset by default' do
assert_equal nil, HelloJob.priority
end
test 'uses given priority' do
original_priority = HelloJob.priority
begin
HelloJob.queue_with_priority 90
assert_equal 90, HelloJob.new.priority
ensure
HelloJob.priority = original_priority
end
end
test 'evals block given to priority to determine priority' do
original_priority = HelloJob.priority
begin
HelloJob.queue_with_priority { 25 }
assert_equal 25, HelloJob.new.priority
ensure
HelloJob.priority = original_priority
end
end
test 'can use arguments to determine priority in priority block' do
original_priority = HelloJob.priority
begin
HelloJob.queue_with_priority { self.arguments.first=='1' ? 99 : 11 }
assert_equal 99, HelloJob.new('1').priority
assert_equal 11, HelloJob.new('3').priority
ensure
HelloJob.priority = original_priority
end
end
test 'uses priority passed to #set' do
job = HelloJob.set(priority: 123).perform_later
assert_equal 123, job.priority
end
end
......@@ -84,4 +84,16 @@ class QueuingTest < ActiveSupport::TestCase
I18n.locale = :en
end
end
test 'should run job with higher priority first' do
skip unless adapter_is?(:delayed_job, :que)
wait_until = Time.now + 3.seconds
TestJob.set(wait_until: wait_until, priority: 20).perform_later "#{@id}.1"
TestJob.set(wait_until: wait_until, priority: 10).perform_later "#{@id}.2"
wait_for_jobs_to_finish_for(10.seconds)
assert job_executed "#{@id}.1"
assert job_executed "#{@id}.2"
assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
end
end
......@@ -42,8 +42,12 @@ def wait_for_jobs_to_finish_for(seconds=60)
end
end
def job_executed
Dummy::Application.root.join("tmp/#{@id}").exist?
def job_executed(id=@id)
Dummy::Application.root.join("tmp/#{id}").exist?
end
def job_executed_at(id=@id)
File.new(Dummy::Application.root.join("tmp/#{id}")).ctime
end
def job_output
......
......@@ -6,6 +6,7 @@ def self.enqueue(*args)
if args.last.is_a?(Hash)
options = args.pop
options.delete(:run_at)
options.delete(:priority)
args << options unless options.empty?
end
self.run(*args)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册