提交 228b2235 编写于 作者: E Eric Tschetter

1) Fix bug with Sinks that causes Realtime it to overwrite the first chunk of...

1) Fix bug with Sinks that causes Realtime it to overwrite the first chunk of data when creating a segment to hand off to compute nodes.
2) Fix bug with RealtimePlumberSchool that can cause it to miss some previously-persisted segments when directory naming changes
3) Have the various runnables in RealtimePlumberSchool name the threads to make it a bit easier to debug
上级 06b8e423
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guava;
/**
*/
public class Runnables
{
public static Runnable threadNaming(final String threadName, final Runnable runnable)
{
return new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
runnable.run();
}
};
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guava;
/**
*/
public abstract class ThreadRenamingRunnable implements Runnable
{
private final String name;
public ThreadRenamingRunnable(
String name
)
{
this.name = name;
}
@Override
public final void run()
{
final Thread currThread = Thread.currentThread();
String currName = currThread.getName();
try {
currThread.setName(name);
doRun();
}
finally {
currThread.setName(currName);
}
}
public abstract void doRun();
}
......@@ -33,9 +33,11 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.guava.ThreadRenamingRunnable;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
......@@ -46,7 +48,6 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
......@@ -203,7 +204,7 @@ public class RealtimePlumberSchool implements PlumberSchool
return ServerView.CallbackAction.CONTINUE;
}
log.info("Checking segment[%s]", segment);
log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())) {
final Interval interval = segment.getInterval();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
......@@ -239,10 +240,7 @@ public class RealtimePlumberSchool implements PlumberSchool
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(
System.currentTimeMillis(),
segmentGranularity.increment(truncatedNow) + windowMillis
)
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
)
);
......@@ -251,10 +249,10 @@ public class RealtimePlumberSchool implements PlumberSchool
scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new Runnable()
new ThreadRenamingRunnable(String.format("%s-overseer", schema.getDataSource()))
{
@Override
public void run()
public void doRun()
{
log.info("Starting merge and push.");
......@@ -272,11 +270,14 @@ public class RealtimePlumberSchool implements PlumberSchool
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
final Sink sink = entry.getValue();
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey())
);
persistExecutor.execute(
new Runnable()
new ThreadRenamingRunnable(threadName)
{
@Override
public void run()
public void doRun()
{
final Interval interval = sink.getInterval();
......@@ -290,17 +291,26 @@ public class RealtimePlumberSchool implements PlumberSchool
final File mergedFile;
try {
final File persistDir = computePersistDir(schema, interval);
final File[] persistedIndexes = persistDir.listFiles();
List<MMappedIndex> indexes = Lists.newArrayList();
for (File persistedIndex : persistedIndexes) {
log.info("Adding index at [%s]", persistedIndex);
indexes.add(IndexIO.mapDir(persistedIndex));
for (FireHydrant fireHydrant : sink) {
StorageAdapter adapter = fireHydrant.getAdapter();
if (adapter instanceof MMappedIndexStorageAdapter) {
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(((MMappedIndexStorageAdapter) adapter).getIndex());
}
else {
log.makeAlert("[%s] Failure to merge-n-push", schema.getDataSource())
.addData("type", "Unknown adapter type")
.addData("adapterClass", adapter.getClass().toString())
.emit();
return;
}
}
mergedFile = IndexMerger.mergeMMapped(
indexes, schema.getAggregators(), new File(persistDir, "merged")
indexes,
schema.getAggregators(),
new File(computePersistDir(schema, interval), "merged")
);
MMappedIndex index = IndexIO.mapDir(mergedFile);
......@@ -421,10 +431,10 @@ public class RealtimePlumberSchool implements PlumberSchool
}
persistExecutor.execute(
new Runnable()
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void run()
public void doRun()
{
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
......
......@@ -45,9 +45,7 @@ public class Sink implements Iterable<FireHydrant>
{
private static final Logger log = new Logger(Sink.class);
private volatile int swapCount = 0;
private volatile FireHydrant currIndex;
private volatile boolean hasSwapped = false;
private final Interval interval;
private final Schema schema;
......@@ -82,7 +80,6 @@ public class Sink implements Iterable<FireHydrant>
}
this.hydrants.addAll(hydrants);
swapCount = hydrants.size();
makeNewCurrIndex(interval.getStartMillis(), schema);
}
......@@ -109,7 +106,6 @@ public class Sink implements Iterable<FireHydrant>
*/
public FireHydrant swap()
{
hasSwapped = true;
return makeNewCurrIndex(interval.getStartMillis(), schema);
}
......@@ -149,16 +145,15 @@ public class Sink implements Iterable<FireHydrant>
);
FireHydrant old;
if (currIndex == null) { // Only happens on initialization...
if (currIndex == null) { // Only happens on initialization, cannot synchronize on null
old = currIndex;
currIndex = new FireHydrant(newIndex, swapCount);
currIndex = new FireHydrant(newIndex, hydrants.size());
hydrants.add(currIndex);
} else {
synchronized (currIndex) {
old = currIndex;
currIndex = new FireHydrant(newIndex, swapCount);
currIndex = new FireHydrant(newIndex, hydrants.size());
hydrants.add(currIndex);
++swapCount;
}
}
......
......@@ -62,6 +62,11 @@ public class MMappedIndexStorageAdapter extends BaseStorageAdapter
this.index = index;
}
public MMappedIndex getIndex()
{
return index;
}
@Override
public String getSegmentIdentifier()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册