提交 4004e53d 编写于 作者: J Juergen Hoeller

JSR-236 support, in particular adapting to JSR-236 Triggers and exposing...

JSR-236 support, in particular adapting to JSR-236 Triggers and exposing additional metadata through ManagedTasks

This is built into ConcurrentTaskExecutor and ConcurrentTaskScheduler now, automatically detecting the JSR-236 ExecutorService variants and adapting to them.

Issue: SPR-8195
上级 52fd84bb
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,13 +16,19 @@
package org.springframework.scheduling.concurrent;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.concurrent.ManagedTask;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.ClassUtils;
/**
* Adapter that takes a JDK 1.5 {@code java.util.concurrent.Executor} and
......@@ -30,6 +36,11 @@ import org.springframework.scheduling.SchedulingTaskExecutor;
* Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
* the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
*
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it,
* exposing a long-running hint based on {@link SchedulingAwareRunnable} and an
* identity name based on the given Runnable/Callable's {@code toString()}.
*
* <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows for
* defining a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor} in bean style,
* exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly.
......@@ -46,6 +57,20 @@ import org.springframework.scheduling.SchedulingTaskExecutor;
*/
public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
private static Class<?> managedExecutorService;
static {
try {
managedExecutorService = ClassUtils.forName(
"javax.enterprise.concurrent.ManagedExecutorService",
ConcurrentTaskScheduler.class.getClassLoader());
}
catch (ClassNotFoundException ex) {
// JSR-236 API not available...
managedExecutorService = null;
}
}
private Executor concurrentExecutor;
private TaskExecutorAdapter adaptedExecutor;
......@@ -63,6 +88,8 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
/**
* Create a new ConcurrentTaskExecutor,
* using the given JDK 1.5 concurrent executor.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
* @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to
*/
public ConcurrentTaskExecutor(Executor concurrentExecutor) {
......@@ -72,11 +99,23 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
/**
* Specify the JDK 1.5 concurrent executor to delegate to.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
* in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
*/
public final void setConcurrentExecutor(Executor concurrentExecutor) {
this.concurrentExecutor =
(concurrentExecutor != null ? concurrentExecutor : Executors.newSingleThreadExecutor());
this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
if (concurrentExecutor != null) {
this.concurrentExecutor = concurrentExecutor;
if (managedExecutorService != null && managedExecutorService.isInstance(concurrentExecutor)) {
this.adaptedExecutor = new ManagedTaskExecutorAdapter(concurrentExecutor);
}
else {
this.adaptedExecutor = new TaskExecutorAdapter(concurrentExecutor);
}
}
else {
this.concurrentExecutor = Executors.newSingleThreadExecutor();
this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
}
}
/**
......@@ -110,4 +149,58 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
return true;
}
/**
* TaskExecutorAdapter subclass that wraps all provided Runnables and Callables
* with a JSR-236 ManagedTask, exposing a long-running hint based on
* {@link SchedulingAwareRunnable} and an identity name based on the task's
* {@code toString()} representation.
*/
private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter {
public ManagedTaskExecutorAdapter(Executor concurrentExecutor) {
super(concurrentExecutor);
}
@Override
public void execute(Runnable task) {
super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
}
/**
* Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask,
* exposing a long-running hint based on {@link SchedulingAwareRunnable}
* and a given identity name.
*/
protected static class ManagedTaskBuilder {
public static Runnable buildManagedTask(Runnable task, String identityName) {
Map<String, String> properties = new HashMap<String, String>(2);
if (task instanceof SchedulingAwareRunnable) {
properties.put(ManagedTask.LONGRUNNING_HINT,
Boolean.toString(((SchedulingAwareRunnable) task).isLongLived()));
}
properties.put(ManagedTask.IDENTITY_NAME, identityName);
return ManagedExecutors.managedTask(task, properties, null);
}
public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) {
Map<String, String> properties = new HashMap<String, String>(1);
properties.put(ManagedTask.IDENTITY_NAME, identityName);
return ManagedExecutors.managedTask(task, properties, null);
}
}
}
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -23,12 +23,16 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.enterprise.concurrent.LastExecution;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
/**
......@@ -37,6 +41,11 @@ import org.springframework.util.ErrorHandler;
* Extends {@link ConcurrentTaskExecutor} in order to implement the
* {@link org.springframework.scheduling.SchedulingTaskExecutor} interface as well.
*
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService}
* in order to use it for trigger-based scheduling if possible, instead of Spring's
* local trigger management which ends up delegating to regular delay-based scheduling
* against the {@code java.util.concurrent.ScheduledExecutorService} API.
*
* <p>Note that there is a pre-built {@link ThreadPoolTaskScheduler} that allows for
* defining a JDK 1.5 {@link java.util.concurrent.ScheduledThreadPoolExecutor} in bean style,
* exposing it as a Spring {@link org.springframework.scheduling.TaskScheduler} directly.
......@@ -53,9 +62,25 @@ import org.springframework.util.ErrorHandler;
*/
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {
private volatile ScheduledExecutorService scheduledExecutor;
private static Class<?> managedScheduledExecutorService;
private volatile ErrorHandler errorHandler;
static {
try {
managedScheduledExecutorService = ClassUtils.forName(
"javax.enterprise.concurrent.ManagedScheduledExecutorService",
ConcurrentTaskScheduler.class.getClassLoader());
}
catch (ClassNotFoundException ex) {
// JSR-236 API not available...
managedScheduledExecutorService = null;
}
}
private ScheduledExecutorService scheduledExecutor;
private boolean enterpriseConcurrentScheduler = false;
private ErrorHandler errorHandler;
/**
......@@ -71,6 +96,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
/**
* Create a new ConcurrentTaskScheduler,
* using the given JDK 1.5 executor as shared delegate.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService}
* in order to use it for trigger-based scheduling if possible,
* instead of Spring's local trigger management.
* @param scheduledExecutor the JDK 1.5 scheduled executor to delegate to
* for {@link org.springframework.scheduling.SchedulingTaskExecutor} as well
* as {@link TaskScheduler} invocations
......@@ -83,6 +111,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
/**
* Create a new ConcurrentTaskScheduler,
* using the given JDK 1.5 executors as delegates.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService}
* in order to use it for trigger-based scheduling if possible,
* instead of Spring's local trigger management.
* @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to
* for {@link org.springframework.scheduling.SchedulingTaskExecutor} invocations
* @param scheduledExecutor the JDK 1.5 scheduled executor to delegate to
......@@ -96,6 +127,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
/**
* Specify the JDK 1.5 scheduled executor to delegate to.
* <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService}
* in order to use it for trigger-based scheduling if possible,
* instead of Spring's local trigger management.
* <p>Note: This will only apply to {@link TaskScheduler} invocations.
* If you want the given executor to apply to
* {@link org.springframework.scheduling.SchedulingTaskExecutor} invocations
......@@ -103,8 +137,15 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
* @see #setConcurrentExecutor
*/
public final void setScheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor =
(scheduledExecutor != null ? scheduledExecutor : Executors.newSingleThreadScheduledExecutor());
if (scheduledExecutor != null) {
this.scheduledExecutor = scheduledExecutor;
this.enterpriseConcurrentScheduler = (managedScheduledExecutorService != null &&
managedScheduledExecutorService.isInstance(scheduledExecutor));
}
else {
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
this.enterpriseConcurrentScheduler = false;
}
}
/**
......@@ -118,9 +159,13 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
try {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
......@@ -130,8 +175,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture schedule(Runnable task, Date startTime) {
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
return this.scheduledExecutor.schedule(
errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
return this.scheduledExecutor.schedule(decorateTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
......@@ -141,8 +185,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
return this.scheduledExecutor.scheduleAtFixedRate(
errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
......@@ -151,8 +194,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
try {
return this.scheduledExecutor.scheduleAtFixedRate(
errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
......@@ -162,8 +204,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
return this.scheduledExecutor.scheduleWithFixedDelay(
errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
......@@ -172,16 +213,41 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
try {
return this.scheduledExecutor.scheduleWithFixedDelay(
errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {
Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
if (this.enterpriseConcurrentScheduler) {
result = ManagedTaskBuilder.buildManagedTask(result, task.toString());
}
return result;
}
/**
* Delegate that adapts a Spring Trigger to a JSR-236 Trigger.
* Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.
*/
private class EnterpriseConcurrentTriggerScheduler {
public ScheduledFuture schedule(Runnable task, final Trigger trigger) {
ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
public Date getNextRunTime(LastExecution le, Date taskScheduledTime) {
return trigger.nextExecutionTime(le != null ?
new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
new SimpleTriggerContext());
}
public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
return false;
}
});
}
}
}
......@@ -65,6 +65,11 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
/**
* Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
* Default is the ThreadPoolExecutor's default thread factory.
* <p>In a Java EE 7 or other managed environment with JSR-236 support,
* consider specifying a JNDI-located ManagedThreadFactory: by default,
* to be found at "java:comp/env/concurrent/tf/DefaultThreadFactory".
* Use the "jee:jndi-lookup" namespace element in XML or the programmatic
* {@link org.springframework.jndi.JndiLocatorDelegate} for convenient lookup.
* @see java.util.concurrent.Executors#defaultThreadFactory()
*/
public void setThreadFactory(ThreadFactory threadFactory) {
......
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -35,6 +35,25 @@ public class SimpleTriggerContext implements TriggerContext {
private volatile Date lastCompletionTime;
/**
* Create a SimpleTriggerContext with all time values set to {@code null}.
*/
public SimpleTriggerContext() {
}
/**
* Create a SimpleTriggerContext with the given time values.
* @param lastScheduledExecutionTime last <i>scheduled</i> execution time
* @param lastActualExecutionTime last <i>actual</i> execution time
* @param lastCompletionTime last completion time
*/
public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) {
this.lastScheduledExecutionTime = lastScheduledExecutionTime;
this.lastActualExecutionTime = lastActualExecutionTime;
this.lastCompletionTime = lastCompletionTime;
}
/**
* Update this holder's state with the latest time values.
* @param lastScheduledExecutionTime last <i>scheduled</i> execution time
......
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -28,11 +28,11 @@ import org.springframework.util.ReflectionUtils;
* Utility methods for decorating tasks with error handling.
*
* <p><b>NOTE:</b> This class is intended for internal use by Spring's scheduler
* implementations. It is only public so that it may be accessed from
* implementations within other packages. It is <i>not</i> intended for general
* use and may change in the future.
* implementations. It is only public so that it may be accessed from impl classes
* within other packages. It is <i>not</i> intended for general use.
*
* @author Mark Fisher
* @author Juergen Hoeller
* @since 3.0
*/
public abstract class TaskUtils {
......@@ -53,12 +53,11 @@ public abstract class TaskUtils {
/**
* Decorates the task for error handling. If the provided
* {@link ErrorHandler} is not null, it will be used. Otherwise,
* repeating tasks will have errors suppressed by default whereas
* one-shot tasks will have errors propagated by default since those
* errors may be expected through the returned {@link Future}. In both
* cases, the errors will be logged.
* Decorate the task for error handling. If the provided {@link ErrorHandler}
* is not {@code null}, it will be used. Otherwise, repeating tasks will have
* errors suppressed by default whereas one-shot tasks will have errors
* propagated by default since those errors may be expected through the
* returned {@link Future}. In both cases, the errors will be logged.
*/
public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
Runnable task, ErrorHandler errorHandler, boolean isRepeatingTask) {
......@@ -66,7 +65,7 @@ public abstract class TaskUtils {
if (task instanceof DelegatingErrorHandlingRunnable) {
return (DelegatingErrorHandlingRunnable) task;
}
ErrorHandler eh = errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask);
ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
return new DelegatingErrorHandlingRunnable(task, eh);
}
......@@ -86,7 +85,7 @@ public abstract class TaskUtils {
* level. It does not perform any additional error handling. This can be
* useful when suppression of errors is the intended behavior.
*/
static class LoggingErrorHandler implements ErrorHandler {
private static class LoggingErrorHandler implements ErrorHandler {
private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);
......@@ -102,7 +101,7 @@ public abstract class TaskUtils {
* An {@link ErrorHandler} implementation that logs the Throwable at error
* level and then propagates it.
*/
static class PropagatingErrorHandler extends LoggingErrorHandler {
private static class PropagatingErrorHandler extends LoggingErrorHandler {
public void handleError(Throwable t) {
super.handleError(t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册