未验证 提交 5bbaae0d 编写于 作者: R Rafael França 提交者: GitHub

Merge pull request #33751 from steves/add_retry_notifications_to_aj

Add hooks to ActiveJob around retries and discards
* Added `enqueue_retry.active_job`, `retry_stopped.active_job`, and `discard.active_job` hooks.
*steves*
* Allow `assert_performed_with` to be called without a block. * Allow `assert_performed_with` to be called without a block.
*bogdanvlviv* *bogdanvlviv*
......
...@@ -44,14 +44,24 @@ module ClassMethods ...@@ -44,14 +44,24 @@ module ClassMethods
# end # end
def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil) def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
rescue_from(*exceptions) do |error| rescue_from(*exceptions) do |error|
payload = {
job: self,
adapter: self.class.queue_adapter,
error: error,
wait: wait
}
if executions < attempts if executions < attempts
logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{error.class}. The original exception was #{error.cause.inspect}." ActiveSupport::Notifications.instrument("enqueue_retry.active_job", payload) do
retry_job wait: determine_delay(wait), queue: queue, priority: priority retry_job wait: determine_delay(wait), queue: queue, priority: priority
end
else else
if block_given? if block_given?
ActiveSupport::Notifications.instrument("retry_stopped.active_job", payload) do
yield self, error yield self, error
end
else else
logger.error "Stopped retrying #{self.class} due to a #{error.class}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}." ActiveSupport::Notifications.instrument("retry_stopped.active_job", payload)
raise error raise error
end end
end end
...@@ -78,10 +88,16 @@ def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: ni ...@@ -78,10 +88,16 @@ def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: ni
# end # end
def discard_on(*exceptions) def discard_on(*exceptions)
rescue_from(*exceptions) do |error| rescue_from(*exceptions) do |error|
payload = {
job: self,
adapter: self.class.queue_adapter,
error: error
}
ActiveSupport::Notifications.instrument("discard.active_job", payload) do
if block_given? if block_given?
yield self, error yield self, error
else end
logger.error "Discarded #{self.class} due to a #{error.class}. The original exception was #{error.cause.inspect}."
end end
end end
end end
......
...@@ -88,6 +88,34 @@ def perform(event) ...@@ -88,6 +88,34 @@ def perform(event)
end end
end end
def enqueue_retry(event)
job = event.payload[:job]
ex = event.payload[:error]
wait = event.payload[:wait]
error do
"Retrying #{job.class} in #{wait} seconds, due to a #{ex.class}. The original exception was #{ex.cause.inspect}."
end
end
def retry_stopped(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts. The original exception was #{ex.cause.inspect}."
end
end
def discard(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Discarded #{job.class} due to a #{ex.class}. The original exception was #{ex.cause.inspect}."
end
end
private private
def queue_name(event) def queue_name(event)
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})" event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
......
...@@ -8,9 +8,11 @@ ...@@ -8,9 +8,11 @@
require "jobs/overridden_logging_job" require "jobs/overridden_logging_job"
require "jobs/nested_job" require "jobs/nested_job"
require "jobs/rescue_job" require "jobs/rescue_job"
require "jobs/retry_job"
require "models/person" require "models/person"
class LoggingTest < ActiveSupport::TestCase class LoggingTest < ActiveSupport::TestCase
include ActiveJob::TestHelper
include ActiveSupport::LogSubscriber::TestHelper include ActiveSupport::LogSubscriber::TestHelper
include ActiveSupport::Logger::Severity include ActiveSupport::Logger::Severity
...@@ -59,14 +61,18 @@ def test_uses_active_job_as_tag ...@@ -59,14 +61,18 @@ def test_uses_active_job_as_tag
end end
def test_uses_job_name_as_tag def test_uses_job_name_as_tag
perform_enqueued_jobs do
LoggingJob.perform_later "Dummy" LoggingJob.perform_later "Dummy"
assert_match(/\[LoggingJob\]/, @logger.messages) assert_match(/\[LoggingJob\]/, @logger.messages)
end end
end
def test_uses_job_id_as_tag def test_uses_job_id_as_tag
perform_enqueued_jobs do
LoggingJob.perform_later "Dummy" LoggingJob.perform_later "Dummy"
assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages) assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages)
end end
end
def test_logs_correct_queue_name def test_logs_correct_queue_name
original_queue_name = LoggingJob.queue_name original_queue_name = LoggingJob.queue_name
...@@ -78,20 +84,24 @@ def test_logs_correct_queue_name ...@@ -78,20 +84,24 @@ def test_logs_correct_queue_name
end end
def test_globalid_parameter_logging def test_globalid_parameter_logging
perform_enqueued_jobs do
person = Person.new(123) person = Person.new(123)
LoggingJob.perform_later person LoggingJob.perform_later person
assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages) assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages)
assert_match(%r{Dummy, here is it: #<Person:.*>}, @logger.messages) assert_match(%r{Dummy, here is it: #<Person:.*>}, @logger.messages)
assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages) assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages)
end end
end
def test_globalid_nested_parameter_logging def test_globalid_nested_parameter_logging
perform_enqueued_jobs do
person = Person.new(123) person = Person.new(123)
LoggingJob.perform_later(person: person) LoggingJob.perform_later(person: person)
assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages) assert_match(%r{Enqueued.*gid://aj/Person/123}, @logger.messages)
assert_match(%r{Dummy, here is it: .*#<Person:.*>}, @logger.messages) assert_match(%r{Dummy, here is it: .*#<Person:.*>}, @logger.messages)
assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages) assert_match(%r{Performing.*gid://aj/Person/123}, @logger.messages)
end end
end
def test_enqueue_job_logging def test_enqueue_job_logging
events = subscribed { HelloJob.perform_later "Cristian" } events = subscribed { HelloJob.perform_later "Cristian" }
...@@ -102,13 +112,16 @@ def test_enqueue_job_logging ...@@ -102,13 +112,16 @@ def test_enqueue_job_logging
end end
def test_perform_job_logging def test_perform_job_logging
perform_enqueued_jobs do
LoggingJob.perform_later "Dummy" LoggingJob.perform_later "Dummy"
assert_match(/Performing LoggingJob \(Job ID: .*?\) from .*? with arguments:.*Dummy/, @logger.messages) assert_match(/Performing LoggingJob \(Job ID: .*?\) from .*? with arguments:.*Dummy/, @logger.messages)
assert_match(/Dummy, here is it: Dummy/, @logger.messages) assert_match(/Dummy, here is it: Dummy/, @logger.messages)
assert_match(/Performed LoggingJob \(Job ID: .*?\) from .*? in .*ms/, @logger.messages) assert_match(/Performed LoggingJob \(Job ID: .*?\) from .*? in .*ms/, @logger.messages)
end end
end
def test_perform_nested_jobs_logging def test_perform_nested_jobs_logging
perform_enqueued_jobs do
NestedJob.perform_later NestedJob.perform_later
assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages) assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages)
assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages) assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages)
...@@ -119,6 +132,7 @@ def test_perform_nested_jobs_logging ...@@ -119,6 +132,7 @@ def test_perform_nested_jobs_logging
assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performed LoggingJob \(Job ID: .*?\) from .* in/, @logger.messages) assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performed LoggingJob \(Job ID: .*?\) from .* in/, @logger.messages)
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performed NestedJob \(Job ID: .*?\) from .* in/, @logger.messages) assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performed NestedJob \(Job ID: .*?\) from .* in/, @logger.messages)
end end
end
def test_enqueue_at_job_logging def test_enqueue_at_job_logging
events = subscribed { HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian" } events = subscribed { HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian" }
...@@ -151,4 +165,35 @@ def test_job_error_logging ...@@ -151,4 +165,35 @@ def test_job_error_logging
assert_match(/Performing RescueJob \(Job ID: .*?\) from .*? with arguments:.*other/, @logger.messages) assert_match(/Performing RescueJob \(Job ID: .*?\) from .*? with arguments:.*other/, @logger.messages)
assert_match(/Error performing RescueJob \(Job ID: .*?\) from .*? in .*ms: RescueJob::OtherError \(Bad hair\):\n.*\brescue_job\.rb:\d+:in `perform'/, @logger.messages) assert_match(/Error performing RescueJob \(Job ID: .*?\) from .*? in .*ms: RescueJob::OtherError \(Bad hair\):\n.*\brescue_job\.rb:\d+:in `perform'/, @logger.messages)
end end
def test_enqueue_retry_logging
perform_enqueued_jobs do
RetryJob.perform_later "DefaultsError", 2
assert_match(/Retrying RetryJob in \d+ seconds, due to a DefaultsError\. The original exception was nil\./, @logger.messages)
end
end
def test_retry_stopped_logging
perform_enqueued_jobs do
RetryJob.perform_later "CustomCatchError", 6
assert_match(/Stopped retrying RetryJob due to a CustomCatchError, which reoccurred on \d+ attempts\. The original exception was #<CustomCatchError: CustomCatchError>\./, @logger.messages)
end
end
def test_retry_stopped_logging_without_block
perform_enqueued_jobs do
begin
RetryJob.perform_later "DefaultsError", 6
rescue DefaultsError
assert_match(/Stopped retrying RetryJob due to a DefaultsError, which reoccurred on \d+ attempts\. The original exception was #<DefaultsError: DefaultsError>\./, @logger.messages)
end
end
end
def test_discard_logging
perform_enqueued_jobs do
RetryJob.perform_later "DiscardableError", 2
assert_match(/Discarded RetryJob due to a DiscardableError\. The original exception was nil\./, @logger.messages)
end
end
end end
...@@ -458,6 +458,14 @@ Active Job ...@@ -458,6 +458,14 @@ Active Job
| `:adapter` | QueueAdapter object processing the job | | `:adapter` | QueueAdapter object processing the job |
| `:job` | Job object | | `:job` | Job object |
### enqueue_retry.active_job
| Key | Value |
| ------------ | -------------------------------------- |
| `:job` | Job object |
| `:adapter` | QueueAdapter object processing the job |
| `:error` | The error that caused the retry |
### perform_start.active_job ### perform_start.active_job
| Key | Value | | Key | Value |
...@@ -472,6 +480,22 @@ Active Job ...@@ -472,6 +480,22 @@ Active Job
| `:adapter` | QueueAdapter object processing the job | | `:adapter` | QueueAdapter object processing the job |
| `:job` | Job object | | `:job` | Job object |
### retry_stopped.active_job
| Key | Value |
| ------------ | -------------------------------------- |
| `:adapter` | QueueAdapter object processing the job |
| `:job` | Job object |
| `:error` | The error that caused the retry |
### discard.active_job
| Key | Value |
| ------------ | -------------------------------------- |
| `:adapter` | QueueAdapter object processing the job |
| `:job` | Job object |
| `:error` | The error that caused the discard |
Action Cable Action Cable
------------ ------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册