提交 71704f7e 编写于 作者: G Gian Merlino

Replace graceful/hard shutdown combo with simple exit

上级 cfc635de
......@@ -127,12 +127,6 @@ public abstract class AbstractTask implements Task
return TaskStatus.running(id);
}
@Override
public void shutdown()
{
// Do nothing.
}
@Override
public String toString()
{
......
......@@ -46,8 +46,8 @@ import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.firehose.GracefulShutdownFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.Sink;
......@@ -93,15 +93,6 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private volatile TaskToolbox toolbox = null;
@JsonIgnore
private volatile GracefulShutdownFirehose firehose = null;
@JsonIgnore
private final Object lock = new Object();
@JsonIgnore
private volatile boolean shutdown = false;
@JsonCreator
public RealtimeIndexTask(
@JsonProperty("id") String id,
......@@ -177,19 +168,7 @@ public class RealtimeIndexTask extends AbstractTask
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
synchronized (lock) {
if (shutdown) {
return TaskStatus.success(getId());
}
log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
}
final Firehose firehose = firehoseFactory.connect();
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
......@@ -355,22 +334,6 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@Override
public void shutdown()
{
try {
synchronized (lock) {
shutdown = true;
if (firehose != null) {
firehose.shutdown();
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@JsonProperty
public Schema getSchema()
{
......
......@@ -134,10 +134,4 @@ public interface Task
* @throws Exception
*/
public TaskStatus run(TaskToolbox toolbox) throws Exception;
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
*/
public void shutdown();
}
......@@ -48,6 +48,7 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
......@@ -207,7 +208,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
log.info("Logging task %s output to: %s", task.getId(), logFile);
final OutputStream toProc = processHolder.process.getOutputStream();
final InputStream fromProc = processHolder.process.getInputStream();
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
......@@ -221,15 +221,16 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
if (statusCode != 0) {
runFailed = true;
}
toLogfile.close();
}
catch (Exception e) {
log.warn(e, "Failed to read from process for task: %s", task.getId());
log.warn(e, "Failed to log process output for task: %s", task.getId());
runFailed = true;
Closeables.close(toLogfile, true);
}
finally {
Closeables.closeQuietly(fromProc);
Closeables.closeQuietly(toLogfile);
Closeables.closeQuietly(toProc);
Closeables.close(processHolder, true);
}
// Upload task logs
......@@ -311,31 +312,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
if (shutdowns == 0) {
log.info("Attempting to gracefully shutdown task: %s", taskid);
try {
// This is gross, but it may still be nicer than talking to the forked JVM via HTTP.
final OutputStream out = taskInfo.processHolder.process.getOutputStream();
out.write(
jsonMapper.writeValueAsBytes(
ImmutableMap.of(
"shutdown",
"now"
)
)
);
out.write('\n');
out.flush();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.processHolder.process.destroy();
}
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.processHolder.process.destroy();
}
}
......@@ -424,12 +403,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private static class ProcessHolder
private static class ProcessHolder implements Closeable
{
private final Process process;
private final File logFile;
private final int port;
private final AtomicInteger shutdowns = new AtomicInteger(0);
private ProcessHolder(Process process, File logFile, int port)
{
......@@ -437,5 +415,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
this.logFile = logFile;
this.port = port;
}
@Override
public void close() throws IOException
{
process.getInputStream().close();
process.getOutputStream().close();
}
}
}
......@@ -40,8 +40,7 @@ public interface TaskRunner
public ListenableFuture<TaskStatus> run(Task task);
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
* Best-effort task shutdown. May or may not do anything.
*/
public void shutdown(String taskid);
......
......@@ -110,7 +110,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTask().getId().equals(taskid)) {
runningItem.getTask().shutdown();
runningItem.getResult().cancel(true);
}
}
}
......
......@@ -78,7 +78,6 @@ public class ExecutorLifecycle
}
// Spawn monitor thread to keep a watch on parent's stdin
// If a message comes over stdin, we want to handle it
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
new Runnable()
......@@ -87,25 +86,8 @@ public class ExecutorLifecycle
public void run()
{
try {
final BufferedReader parentReader = new BufferedReader(new InputStreamReader(parentStream));
String messageString;
while ((messageString = parentReader.readLine()) != null) {
final Map<String, Object> message = jsonMapper
.readValue(
messageString,
new TypeReference<Map<String, Object>>()
{
}
);
if (message == null) {
break;
} else if (message.get("shutdown") != null && message.get("shutdown").equals("now")) {
log.info("Shutting down!");
task.shutdown();
} else {
throw new ISE("Unrecognized message from parent: %s", message);
}
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime.firehose;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.RejectionPolicy;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class GracefulShutdownFirehose implements Firehose
{
private static final Logger log = new Logger(GracefulShutdownFirehose.class);
private final Firehose firehose;
private final IndexGranularity segmentGranularity;
private final long windowMillis;
private final ScheduledExecutorService scheduledExecutor;
private final RejectionPolicy rejectionPolicy;
// when this is set to false, the firehose will have no more rows
private final AtomicBoolean valveOn = new AtomicBoolean(true);
// when this is set to true, the firehose will begin rejecting events
private volatile boolean beginRejectionPolicy = false;
public GracefulShutdownFirehose(
Firehose firehose,
IndexGranularity segmentGranularity,
Period windowPeriod
)
{
this.firehose = firehose;
this.segmentGranularity = segmentGranularity;
this.windowMillis = windowPeriod.toStandardDuration().getMillis() * 2;
this.scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("firehose_scheduled_%d")
.build()
);
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow);
this.rejectionPolicy = new IntervalRejectionPolicyFactory(new Interval(truncatedNow, end)).create(windowPeriod);
}
public void shutdown() throws IOException
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow) + windowMillis;
final Duration timeUntilShutdown = new Duration(System.currentTimeMillis(), end);
log.info("Shutdown at approx. %s (in %s)", new DateTime(end), timeUntilShutdown);
ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutor,
timeUntilShutdown,
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
{
try {
valveOn.set(false);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ScheduledExecutors.Signal.STOP;
}
}
);
beginRejectionPolicy = true;
}
@Override
public boolean hasMore()
{
return valveOn.get() && firehose.hasMore();
}
@Override
public InputRow nextRow()
{
InputRow next = firehose.nextRow();
if (!beginRejectionPolicy || rejectionPolicy.accept(next.getTimestampFromEpoch())) {
return next;
}
return null;
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册