提交 3617ac17 编写于 作者: C cheddar

1) Eliminate ExecutorMain and have it run using the new Main!

上级 269997dc
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.http.log.NoopRequestLogger;
import com.metamx.druid.http.log.RequestLogger;
import com.metamx.druid.initialization.BatchDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import org.joda.time.Duration;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validation;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class QueryableNode<T extends QueryableNode> extends RegisteringNode
{
private final Logger log;
private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private final String nodeType;
private final JsonConfigurator jsonConfigurator;
private DruidServerMetadata druidServerMetadata = null;
private ServiceEmitter emitter = null;
private List<Monitor> monitors = null;
private Server server = null;
private CuratorFramework curator = null;
private DataSegmentAnnouncer announcer = null;
private ZkPathsConfig zkPaths = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private RequestLogger requestLogger = null;
private ServerInventoryView serverInventoryView = null;
private ServerView serverView = null;
private InventoryView inventoryView = null;
private boolean initialized = false;
public QueryableNode(
String nodeType,
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(Arrays.asList(jsonMapper, smileMapper));
this.log = log;
this.configFactory = configFactory;
this.props = props;
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.smileMapper = smileMapper;
Preconditions.checkNotNull(props, "props");
Preconditions.checkNotNull(lifecycle, "lifecycle");
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
Preconditions.checkNotNull(smileMapper, "smileMapper");
Preconditions.checkNotNull(configFactory, "configFactory");
this.jsonConfigurator = new JsonConfigurator(jsonMapper, Validation.buildDefaultValidatorFactory().getValidator());
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
this.nodeType = nodeType;
}
public T setDruidServerMetadata(DruidServerMetadata druidServerMetadata)
{
checkFieldNotSetAndSet("druidServerMetadata", druidServerMetadata);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setCuratorFramework(CuratorFramework curator)
{
checkFieldNotSetAndSet("curator", curator);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setAnnouncer(DataSegmentAnnouncer announcer)
{
checkFieldNotSetAndSet("announcer", announcer);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setEmitter(ServiceEmitter emitter)
{
checkFieldNotSetAndSet("emitter", emitter);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setMonitors(List<Monitor> monitors)
{
checkFieldNotSetAndSet("monitors", monitors);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServer(Server server)
{
checkFieldNotSetAndSet("server", server);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setZkPaths(ZkPathsConfig zkPaths)
{
checkFieldNotSetAndSet("zkPaths", zkPaths);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
{
checkFieldNotSetAndSet("scheduledExecutorFactory", factory);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setRequestLogger(RequestLogger requestLogger)
{
checkFieldNotSetAndSet("requestLogger", requestLogger);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setInventoryView(InventoryView inventoryView)
{
checkFieldNotSetAndSet("inventoryView", inventoryView);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServerView(ServerView serverView)
{
checkFieldNotSetAndSet("serverView", serverView);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(Class<?>... clazzes)
{
jsonMapper.registerSubtypes(clazzes);
smileMapper.registerSubtypes(clazzes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(NamedType... namedTypes)
{
jsonMapper.registerSubtypes(namedTypes);
smileMapper.registerSubtypes(namedTypes);
return (T) this;
}
public Lifecycle getLifecycle()
{
return lifecycle;
}
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
public ObjectMapper getSmileMapper()
{
return smileMapper;
}
public Properties getProps()
{
return props;
}
public ConfigurationObjectFactory getConfigFactory()
{
return configFactory;
}
public JsonConfigurator getJsonConfigurator()
{
return jsonConfigurator;
}
public DruidServerMetadata getDruidServerMetadata()
{
initializeDruidServerMetadata();
return druidServerMetadata;
}
public CuratorFramework getCuratorFramework()
{
initializeCuratorFramework();
return curator;
}
public DataSegmentAnnouncer getAnnouncer()
{
initializeAnnouncer();
return announcer;
}
public ServiceEmitter getEmitter()
{
initializeEmitter();
return emitter;
}
public List<Monitor> getMonitors()
{
initializeMonitors();
return monitors;
}
public Server getServer()
{
initializeServer();
return server;
}
public ZkPathsConfig getZkPaths()
{
initializeZkPaths();
return zkPaths;
}
public ScheduledExecutorFactory getScheduledExecutorFactory()
{
initializeScheduledExecutorFactory();
return scheduledExecutorFactory;
}
public RequestLogger getRequestLogger()
{
initializeRequestLogger();
return requestLogger;
}
public ServerView getServerView()
{
initializeServerView();
return serverView;
}
public InventoryView getInventoryView()
{
initializeInventoryView();
return inventoryView;
}
private void initializeDruidServerMetadata()
{
if (druidServerMetadata == null) {
final DruidServerConfig serverConfig = getConfigFactory().build(DruidServerConfig.class);
setDruidServerMetadata(
new DruidServerMetadata(
null, // TODO: serverConfig.getServerName(),
null, // TODO: serverConfig.getHost(),
serverConfig.getMaxSize(),
nodeType,
serverConfig.getTier()
)
);
}
}
private void initializeServerView()
{
if (serverView == null) {
initializeServerInventoryView();
serverView = serverInventoryView;
}
}
private void initializeInventoryView()
{
if (inventoryView == null) {
initializeServerInventoryView();
inventoryView = serverInventoryView;
}
}
private void initializeServerInventoryView()
{
if (serverInventoryView == null) {
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class);
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
getZkPaths(),
getCuratorFramework(),
getJsonMapper()
);
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
getZkPaths(),
getCuratorFramework(),
getJsonMapper()
);
} else {
throw new IAE("Unknown type %s", announcerType);
}
lifecycle.addManagedInstance(serverInventoryView);
}
}
private void initializeRequestLogger()
{
if (requestLogger == null) {
try {
final String loggingType = props.getProperty("druid.request.logging.type");
if ("emitter".equals(loggingType)) {
setRequestLogger(
Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
)
);
} else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(
Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
)
);
} else {
setRequestLogger(new NoopRequestLogger());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
lifecycle.addManagedInstance(requestLogger);
}
}
private void initializeZkPaths()
{
if (zkPaths == null) {
setZkPaths(getConfigFactory().build(ZkPathsConfig.class));
}
}
private void initializeScheduledExecutorFactory()
{
if (scheduledExecutorFactory == null) {
setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle()));
}
}
private void initializeCuratorFramework()
{
if (curator == null) {
try {
setCuratorFramework(Initialization.makeCuratorFramework(configFactory.build(CuratorConfig.class), lifecycle));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
private void initializeAnnouncer()
{
if (announcer == null) {
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
final BatchDataSegmentAnnouncerConfig config = getConfigFactory().build(BatchDataSegmentAnnouncerConfig.class);
final String announcerType = "legacy";
final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
getZkPaths(),
announcer,
getJsonMapper()
);
} else if ("legacy".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<DataSegmentAnnouncer>asList(
new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
getZkPaths(),
announcer,
getJsonMapper()
),
new SingleDataSegmentAnnouncer(
getDruidServerMetadata(),
getZkPaths(),
announcer,
getJsonMapper()
)
)
);
} else {
throw new ISE("Unknown announcer type [%s]", announcerType);
}
setAnnouncer(dataSegmentAnnouncer);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
}
}
private void initializeServer()
{
if (server == null) {
setServer(Initialization.makeJettyServer(null, configFactory.build(ServerConfig.class))); // TODO: eliminate
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
List<Monitor> theMonitors = Lists.newArrayList();
theMonitors.add(new JvmMonitor());
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) {
theMonitors.add(new SysMonitor());
}
setMonitors(theMonitors);
}
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder().withNumConnections(1);
final String emitterTimeoutDuration = props.getProperty("druid.emitter.timeOut");
if (emitterTimeoutDuration != null) {
configBuilder.withReadTimeout(new Duration(emitterTimeoutDuration));
}
final HttpClient httpClient = HttpClientInit.createClient(configBuilder.build(), lifecycle);
setEmitter(
new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
)
);
}
EmittingLogger.registerEmitter(emitter);
}
protected void init() throws Exception
{
doInit();
initialized = true;
}
protected abstract void doInit() throws Exception;
@LifecycleStart
public synchronized void start() throws Exception
{
if (!initialized) {
init();
}
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
protected ScheduledExecutorService startMonitoring(List<Monitor> monitors)
{
final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
getConfigFactory().build(MonitorSchedulerConfig.class),
globalScheduledExec,
getEmitter(),
monitors
);
getLifecycle().addManagedInstance(monitorScheduler);
return globalScheduledExec;
}
protected void checkFieldNotSetAndSet(String fieldName, Object value)
{
Class<?> theClazz = this.getClass();
while (theClazz != null && theClazz != Object.class) {
try {
final Field field = theClazz.getDeclaredField(fieldName);
field.setAccessible(true);
Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName);
field.set(this, value);
return;
}
catch (NoSuchFieldException e) {
// Perhaps it is inherited?
theClazz = theClazz.getSuperclass();
}
catch (IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass());
}
}
......@@ -22,9 +22,6 @@ package com.metamx.druid;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.index.v1.serde.Registererer;
import java.util.Arrays;
import java.util.List;
/**
*/
public class RegisteringNode
......@@ -43,17 +40,4 @@ public class RegisteringNode
}
private static boolean doneRegister = false;
private final List<ObjectMapper> mappers;
public RegisteringNode(List<ObjectMapper> mappers)
{
this.mappers = mappers;
}
public RegisteringNode registerHandlers(Registererer... registererers)
{
registerHandlers(Arrays.asList(registererers), mappers);
return this;
}
}
......@@ -4,10 +4,12 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.guice.ConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.initialization.Initialization;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import java.io.IOException;
......@@ -15,6 +17,8 @@ import java.io.IOException;
*/
public class CuratorModule implements Module
{
private static final Logger log = new Logger(CuratorModule.class);
@Override
public void configure(Binder binder)
{
......@@ -24,6 +28,33 @@ public class CuratorModule implements Module
@Provides @LazySingleton
public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException
{
return Initialization.makeCuratorFramework(config, lifecycle);
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(config.getZkHosts())
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.enableCompression()))
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Curator");
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}
);
return framework;
}
}
......@@ -325,7 +325,7 @@ public class Announcer
}
if (subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
log.info("Asked to unnanounce path[%s] that is not announced. Was it registered multiple times?.", path);
}
try {
......
......@@ -3,6 +3,7 @@ package com.metamx.druid.curator.discovery;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
......@@ -17,9 +18,9 @@ import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import javax.annotation.Nullable;
import java.lang.annotation.Annotation;
......@@ -179,6 +180,35 @@ public class DiscoveryModule implements Module
Lifecycle lifecycle
) throws Exception
{
return Initialization.makeServiceDiscoveryClient(curator, config.get(), lifecycle);
final ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.get().getPath())
.client(curator)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceDiscovery.start();
}
@Override
public void stop()
{
try {
serviceDiscovery.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
},
Lifecycle.Stage.LAST
);
return serviceDiscovery;
}
}
......@@ -13,6 +13,7 @@ import com.metamx.druid.query.group.GroupByQueryQueryToolChest;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQueryConfig;
import com.metamx.druid.query.search.SearchQueryQueryToolChest;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
......@@ -47,5 +48,6 @@ public class QueryToolChestModule implements Module
}
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
}
}
......@@ -20,343 +20,28 @@
package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.guice.DruidGuiceExtensions;
import com.metamx.druid.guice.DruidSecondaryModule;
import com.metamx.druid.guice.annotations.Json;
import com.metamx.druid.guice.annotations.Smile;
import com.metamx.druid.http.log.EmittingRequestLogger;
import com.metamx.druid.http.log.FileRequestLogger;
import com.metamx.druid.http.log.RequestLogger;
import com.metamx.druid.jackson.JacksonModule;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.zookeeper.data.Stat;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.skife.config.ConfigurationObjectFactory;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
*/
public class Initialization
{
private static final Logger log = new Logger(Initialization.class);
private static final String PROPERTIES_FILE = "runtime.properties";
private static final Properties zkProps = new Properties();
private static final Properties fileProps = new Properties(zkProps);
private static Properties props = null;
/**
* Load properties.
* Properties are layered:
* <p/>
* # stored in zookeeper
* # runtime.properties file,
* # cmdLine -D
* <p/>
* command line overrides runtime.properties which overrides zookeeper
* <p/>
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host is not set then do not load properties from zookeeper.
*
* @return Properties ready to use.
*/
public synchronized static Properties loadProperties()
{
if (props != null) {
return props;
}
// Note that zookeeper coordinates must be either in cmdLine or in runtime.properties
Properties sp = System.getProperties();
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
tmp_props.putAll(sp);
final InputStream stream = ClassLoader.getSystemResourceAsStream(PROPERTIES_FILE);
if (stream == null) {
log.info("%s not found on classpath, relying only on system properties and zookeeper.", PROPERTIES_FILE);
} else {
log.info("Loading properties from %s", PROPERTIES_FILE);
try {
try {
fileProps.load(stream);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
finally {
Closeables.closeQuietly(stream);
}
}
// log properties from file; stringPropertyNames() would normally cascade down into the sub Properties objects, but
// zkProps (the parent level) is empty at this point so it will only log properties from runtime.properties
for (String prop : fileProps.stringPropertyNames()) {
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
}
final String zkHostsProperty = "druid.zk.service.host";
if (tmp_props.getProperty(zkHostsProperty) != null) {
final ConfigurationObjectFactory factory = Config.createFactory(tmp_props);
ZkPathsConfig config;
try {
config = factory.build(ZkPathsConfig.class);
}
catch (IllegalArgumentException e) {
log.warn(e, "Unable to build ZkPathsConfig. Cannot load properties from ZK.");
config = null;
}
if (config != null) {
Lifecycle lifecycle = new Lifecycle();
try {
CuratorFramework curator = makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
lifecycle.start();
final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
if (stat != null) {
final byte[] data = curator.getData().forPath(config.getPropertiesPath());
zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
}
// log properties from zk
for (String prop : zkProps.stringPropertyNames()) {
log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycle.stop();
}
}
} else {
log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty);
}
props = tmp_props;
return props;
}
public static Server makeJettyServer(DruidNode node, ServerConfig config)
{
final QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads());
final Server server = new Server();
server.setThreadPool(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(node.getPort());
connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
connector.setStatsOn(true);
server.setConnectors(new Connector[]{connector});
return server;
}
public static CuratorFramework makeCuratorFramework(
CuratorConfig curatorConfig,
Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression()))
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Curator");
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}
);
return framework;
}
public static ServiceDiscovery<Void> makeServiceDiscoveryClient(
CuratorFramework discoveryClient,
CuratorDiscoveryConfig config,
Lifecycle lifecycle
)
throws Exception
{
final ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.getPath())
.client(discoveryClient)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceDiscovery.start();
}
@Override
public void stop()
{
try {
serviceDiscovery.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
},
Lifecycle.Stage.LAST
);
return serviceDiscovery;
}
public static void announceDefaultService(
final DruidNode nodeConfig,
final ServiceAnnouncer serviceAnnouncer,
final Lifecycle lifecycle
) throws Exception
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(nodeConfig);
}
@Override
public void stop()
{
try {
serviceAnnouncer.unannounce(nodeConfig);
}
catch (Exception e) {
log.warn(e, "Failed to unannouce default service[%s]", nodeConfig.getServiceName());
}
}
}
);
}
public static ServiceProvider makeServiceProvider(
String serviceName,
ServiceDiscovery serviceDiscovery,
Lifecycle lifecycle
)
{
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder()
.serviceName(serviceName)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceProvider.start();
}
@Override
public void stop()
{
try {
serviceProvider.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
return serviceProvider;
}
public static RequestLogger makeFileRequestLogger(
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
) throws IOException
{
return new FileRequestLogger(
objectMapper,
factory.create(1, "RequestLogger-%s"),
new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
);
}
public static RequestLogger makeEmittingRequestLogger(Properties props, ServiceEmitter emitter)
{
return new EmittingRequestLogger(emitter, PropUtils.getProperty(props, "druid.request.logging.feed"));
}
public static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(
......
......@@ -4,6 +4,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.ConfigurationException;
import com.google.inject.Inject;
......@@ -25,7 +26,10 @@ import com.sun.jersey.api.core.ResourceConfig;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.spi.container.servlet.WebConfig;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.ServletException;
import java.util.List;
......@@ -113,7 +117,7 @@ public class JettyServerModule extends JerseyServletModule
@Provides @LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{
final Server server = Initialization.makeJettyServer(node, config);
final Server server = makeJettyServer(node, config);
try {
initializer.initialize(server, injector);
}
......@@ -145,4 +149,23 @@ public class JettyServerModule extends JerseyServletModule
);
return server;
}
private static Server makeJettyServer(@Self DruidNode node, ServerConfig config)
{
final QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads());
final Server server = new Server();
server.setThreadPool(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(node.getPort());
connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
connector.setStatsOn(true);
server.setConnectors(new Connector[]{connector});
return server;
}
}
......@@ -19,23 +19,18 @@
package com.metamx.druid.initialization;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.skife.config.ConfigurationObjectFactory;
import java.io.ByteArrayInputStream;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
/**
......@@ -54,21 +49,23 @@ public class PropertiesModule implements Module
@Override
public void configure(Binder binder)
{
final Properties zkProps = new Properties();
final Properties fileProps = new Properties(zkProps);
// Note that zookeeper coordinates must be either in cmdLine or in runtime.properties
Properties sp = System.getProperties();
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
tmp_props.putAll(sp);
final Properties fileProps = new Properties();
Properties systemProps = System.getProperties();
Properties props = new Properties(fileProps);
props.putAll(systemProps);
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
}
}
final InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
if (stream == null) {
log.info("%s not found on classpath, relying only on system properties and zookeeper.", propertiesFile);
} else {
log.info("Loading properties from %s", propertiesFile);
try {
if (stream != null) {
log.info("Loading properties from %s", propertiesFile);
try {
fileProps.load(stream);
}
......@@ -76,60 +73,14 @@ public class PropertiesModule implements Module
throw Throwables.propagate(e);
}
}
finally {
Closeables.closeQuietly(stream);
}
}
// log properties from file; stringPropertyNames() would normally cascade down into the sub Properties objects, but
// zkProps (the parent level) is empty at this point so it will only log properties from runtime.properties
for (String prop : fileProps.stringPropertyNames()) {
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
catch (FileNotFoundException e) {
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
final String zkHostsProperty = "druid.zk.service.host";
if (tmp_props.getProperty(zkHostsProperty) != null) {
final ConfigurationObjectFactory factory = Config.createFactory(tmp_props);
ZkPathsConfig config;
try {
config = factory.build(ZkPathsConfig.class);
}
catch (IllegalArgumentException e) {
log.warn(e, "Unable to build ZkPathsConfig. Cannot load properties from ZK.");
config = null;
}
if (config != null) {
Lifecycle lifecycle = new Lifecycle();
try {
CuratorFramework curator = Initialization.makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
lifecycle.start();
final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
if (stat != null) {
final byte[] data = curator.getData().forPath(config.getPropertiesPath());
zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
}
// log properties from zk
for (String prop : zkProps.stringPropertyNames()) {
log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycle.stop();
}
}
} else {
log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty);
finally {
Closeables.closeQuietly(stream);
}
binder.bind(Properties.class).toInstance(tmp_props);
binder.bind(Properties.class).toInstance(props);
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
......@@ -17,54 +17,22 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.worker.http;
package com.metamx.druid.query.search;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.indexing.coordinator.ForkingTaskRunner;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.inject.Singleton;
import javax.validation.constraints.Min;
/**
*/
public class WorkerServletModule extends JerseyServletModule
public class SearchQueryConfig
{
private final ObjectMapper jsonMapper;
private final ServiceEmitter emitter;
private final ForkingTaskRunner forkingTaskRunner;
@JsonProperty
@Min(1)
private int maxSearchLimit = 1000;
public WorkerServletModule(
ObjectMapper jsonMapper,
ServiceEmitter emitter,
ForkingTaskRunner forkingTaskRunner
)
public int getMaxSearchLimit()
{
this.jsonMapper = jsonMapper;
this.emitter = emitter;
this.forkingTaskRunner = forkingTaskRunner;
}
@Override
protected void configureServlets()
{
bind(WorkerResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(ServiceEmitter.class).toInstance(emitter);
bind(ForkingTaskRunner.class).toInstance(forkingTaskRunner);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
return maxSearchLimit;
}
}
......@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
......@@ -37,7 +38,6 @@ import com.metamx.druid.Query;
import com.metamx.druid.ResultGranularTimestampComparator;
import com.metamx.druid.SearchBinaryFn;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.IntervalChunkingQueryRunner;
import com.metamx.druid.query.MetricManipulationFn;
......@@ -48,9 +48,7 @@ import com.metamx.druid.query.filter.DimFilter;
import com.metamx.druid.result.BySegmentSearchResultValue;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.SearchResultValue;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
......@@ -60,7 +58,6 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
......@@ -70,22 +67,18 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
private static final byte SEARCH_QUERY = 0x2;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>()
{
};
private static final int maxSearchLimit;
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>(){};
static {
// I dislike this static loading of properies, but it's the only mechanism available right now.
Properties props = Initialization.loadProperties();
maxSearchLimit = PropUtils.getPropertyAsInt(props, "com.metamx.query.search.maxSearchLimit", 1000);
}
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private final SearchQueryConfig config;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
@Inject
public SearchQueryQueryToolChest(
SearchQueryConfig config
)
{
};
this.config = config;
}
@Override
public QueryRunner<Result<SearchResultValue>> mergeResults(QueryRunner<Result<SearchResultValue>> runner)
......@@ -259,15 +252,23 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(QueryRunner<Result<SearchResultValue>> runner)
{
return new SearchThresholdAdjustingQueryRunner(
new IntervalChunkingQueryRunner<Result<SearchResultValue>>(runner, Period.months(1))
new IntervalChunkingQueryRunner<Result<SearchResultValue>>(runner, Period.months(1)),
config
);
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private final QueryRunner<Result<SearchResultValue>> runner;
public SearchThresholdAdjustingQueryRunner(QueryRunner<Result<SearchResultValue>> runner) {this.runner = runner;}
private final SearchQueryConfig config;
public SearchThresholdAdjustingQueryRunner(
QueryRunner<Result<SearchResultValue>> runner,
SearchQueryConfig config
) {
this.runner = runner;
this.config = config;
}
@Override
public Sequence<Result<SearchResultValue>> run(Query<Result<SearchResultValue>> input)
......@@ -277,14 +278,14 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
final SearchQuery query = (SearchQuery) input;
if (query.getLimit() < maxSearchLimit) {
if (query.getLimit() < config.getMaxSearchLimit()) {
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
return Sequences.map(
runner.run(query.withLimit(maxSearchLimit)),
runner.run(query.withLimit(config.getMaxSearchLimit())),
new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
{
@Override
......
......@@ -32,6 +32,10 @@ import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.executor.ExecutorLifecycle;
import com.metamx.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.S3DataSegmentKiller;
......@@ -39,6 +43,15 @@ import com.metamx.druid.loading.S3DataSegmentKiller;
*/
public class PeonModule implements Module
{
private final ExecutorLifecycleConfig config;
public PeonModule(
ExecutorLifecycleConfig config
)
{
this.config = config;
}
@Override
public void configure(Binder binder)
{
......@@ -63,5 +76,11 @@ public class PeonModule implements Module
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(config);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class).in(LazySingleton.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
}
}
......@@ -32,7 +32,6 @@ import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.SegmentListUnusedAction;
import com.metamx.druid.indexing.common.actions.SegmentNukeAction;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
......@@ -51,13 +50,7 @@ public class KillTask extends AbstractTask
)
{
super(
id != null ? id : String.format(
"kill_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
TaskUtils.makeId(id, "kill", dataSource, interval),
dataSource,
interval
);
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
......@@ -17,41 +17,47 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
package com.metamx.druid.indexing.common.task;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
/**
*/
public class StatusServlet extends HttpServlet
public class NoopTask extends AbstractTask
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
ByteArrayOutputStream retVal = new ByteArrayOutputStream();
PrintWriter out = new PrintWriter(new OutputStreamWriter(retVal));
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
private static final Logger log = new Logger(NoopTask.class);
out.printf("Max Memory:\t%,18d\t%1$d%n", maxMemory);
out.printf("Total Memory:\t%,18d\t%1$d%n", totalMemory);
out.printf("Free Memory:\t%,18d\t%1$d%n", freeMemory);
out.printf("Used Memory:\t%,18d\t%1$d%n", totalMemory - freeMemory);
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("interval") Interval interval
)
{
super(
id == null ? String.format("noop_%s", new DateTime()) : id,
"none",
interval == null ? new Interval(Period.days(1), new DateTime()) : interval
);
}
out.flush();
@Override
public String getType()
{
return "noop";
}
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType("text/plain");
resp.getOutputStream().write(retVal.toByteArray());
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
log.info("Running noop task[%s]", getId());
Thread.sleep(2500);
return TaskStatus.success(getId());
}
}
......@@ -55,6 +55,7 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class)
})
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
......@@ -17,57 +17,24 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.worker.executor;
package com.metamx.druid.indexing.common.task;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
*/
public class ExecutorMain
public class TaskUtils
{
private static final Logger log = new Logger(ExecutorMain.class);
public static void main(String[] args) throws Exception
public static String makeId(String id, final String typeName, String dataSource, Interval interval)
{
LogLevelAdjuster.register();
if (args.length != 2) {
log.info("Usage: ExecutorMain <task.json> <status.json>");
System.exit(2);
}
Iterator<String> arguments = Arrays.asList(args).iterator();
final String taskJsonFile = arguments.next();
final String statusJsonFile = arguments.next();
final ExecutorNode node = ExecutorNode.builder()
.build(
System.getProperty("druid.executor.nodeType", "indexer-executor"),
new ExecutorLifecycleFactory(
new File(taskJsonFile),
new File(statusJsonFile),
System.in
)
);
final Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);
try {
lifecycle.start();
node.join();
lifecycle.stop();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
return id != null ? id : String.format(
"%s_%s_%s_%s_%s",
typeName,
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
);
}
}
......@@ -45,7 +45,6 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogPusher;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
......@@ -65,12 +64,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.
* Runs tasks in separate processes using the "internal peon" verb.
*/
public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
private final ForkingTaskRunnerConfig config;
private final Properties props;
......@@ -145,11 +145,15 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
}
if (taskInfo.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
log.makeAlert("WTF?! TaskInfo already has a processHolder")
.addData("task", task.getId())
.emit();
throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
}
final List<String> command = Lists.newArrayList();
......@@ -160,12 +164,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add("-cp");
command.add(config.getClasspath());
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOpts())
);
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
......@@ -194,15 +193,16 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add(String.format("-Ddruid.executor.nodeType=%s", nodeType));
}
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
command.add(config.getMainClass());
command.add("io.druid.cli.Main");
command.add("internal");
command.add("peon");
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add(String.format("--nodeType %s", nodeType));
}
command.add(taskFile.toString());
command.add(statusFile.toString());
......
......@@ -34,7 +34,6 @@ import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
......@@ -129,7 +128,23 @@ public class TaskMaster
}
);
leaderLifecycle.addManagedInstance(taskQueue);
Initialization.announceDefaultService(node, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(node);
}
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
);
leaderLifecycle.addManagedInstance(taskConsumer);
if (taskRunner instanceof RemoteTaskRunner) {
......
......@@ -2,7 +2,6 @@ package com.metamx.druid.indexing.coordinator.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
......@@ -23,6 +22,11 @@ public class ForkingTaskRunnerConfig
@NotNull
private String javaCommand = "java";
/**
* This is intended for setting -X parameters on the underlying java. It is used by first splitting on whitespace,
* so it cannot handle properties that have whitespace in the value. Those properties should be set via a
* druid.indexer.fork.property. property instead.
*/
@JsonProperty
@NotNull
private String javaOpts = "";
......@@ -31,10 +35,6 @@ public class ForkingTaskRunnerConfig
@NotNull
private String classpath = System.getProperty("java.class.path");
@JsonProperty
@NotNull
private String mainClass = ExecutorMain.class.getName();
@JsonProperty
@Min(1024) @Max(65535)
private int startPort = 8080;
......@@ -68,11 +68,6 @@ public class ForkingTaskRunnerConfig
return classpath;
}
public String getMainClass()
{
return mainClass;
}
public int getStartPort()
{
return startPort;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
/**
*/
public class IndexerCoordinatorServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
private final ServiceEmitter emitter;
private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
public IndexerCoordinatorServletModule(
ObjectMapper jsonMapper,
IndexerCoordinatorConfig indexerCoordinatorConfig,
ServiceEmitter emitter,
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager
)
{
this.jsonMapper = jsonMapper;
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
this.emitter = emitter;
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
}
@Override
protected void configureServlets()
{
bind(IndexerCoordinatorResource.class);
bind(OldIndexerCoordinatorResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskMaster.class).toInstance(taskMaster);
bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter);
bind(TaskLogStreamer.class).toInstance(taskLogStreamer);
bind(JacksonConfigManager.class).toInstance(configManager);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}
......@@ -5,9 +5,10 @@ import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.TaskRunner;
......@@ -17,7 +18,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Encapsulates the lifecycle of a task executor. Loads one task, runs it, writes its status, and all the while
......@@ -27,37 +27,33 @@ public class ExecutorLifecycle
{
private static final EmittingLogger log = new EmittingLogger(ExecutorLifecycle.class);
private final File taskFile;
private final File statusFile;
private final ExecutorLifecycleConfig config;
private final TaskRunner taskRunner;
private final InputStream parentStream;
private final ObjectMapper jsonMapper;
private final ExecutorService parentMonitorExec = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("parent-monitor-%d").setDaemon(true).build()
);
private final ExecutorService parentMonitorExec = Execs.singleThreaded("parent-monitor-%d");
private volatile ListenableFuture<TaskStatus> statusFuture = null;
@Inject
public ExecutorLifecycle(
File taskFile,
File statusFile,
ExecutorLifecycleConfig config,
TaskRunner taskRunner,
InputStream parentStream,
ObjectMapper jsonMapper
)
{
this.taskFile = taskFile;
this.statusFile = statusFile;
this.config = config;
this.taskRunner = taskRunner;
this.parentStream = parentStream;
this.jsonMapper = jsonMapper;
}
@LifecycleStart
public void start()
{
final File taskFile = config.getTaskFile();
final File statusFile = config.getStatusFile();
final InputStream parentStream = config.getParentStream();
final Task task;
try {
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
......@@ -17,54 +17,72 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
import javax.inject.Singleton;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import java.io.File;
import java.io.InputStream;
/**
*/
public class ClientServletModule extends JerseyServletModule
public class ExecutorLifecycleConfig
{
private final QuerySegmentWalker texasRanger;
private final InventoryView serverInventoryView;
private final ObjectMapper jsonMapper;
@JsonProperty
@NotNull
private File taskFile = null;
public ClientServletModule(
QuerySegmentWalker texasRanger,
InventoryView serverInventoryView,
ObjectMapper jsonMapper
)
@JsonProperty
@NotNull
private File statusFile = null;
@JsonProperty
@Pattern(regexp = "\\{stdin\\}")
private String parentStreamName = "stdin";
public File getTaskFile()
{
this.texasRanger = texasRanger;
this.serverInventoryView = serverInventoryView;
this.jsonMapper = jsonMapper;
return taskFile;
}
@Override
protected void configureServlets()
public ExecutorLifecycleConfig setTaskFile(File taskFile)
{
bind(ClientInfoResource.class);
bind(QuerySegmentWalker.class).toInstance(texasRanger);
bind(InventoryView.class).toInstance(serverInventoryView);
this.taskFile = taskFile;
return this;
}
serve("/*").with(GuiceContainer.class);
public File getStatusFile()
{
return statusFile;
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
public ExecutorLifecycleConfig setStatusFile(File statusFile)
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
this.statusFile = statusFile;
return this;
}
public String getParentStreamName()
{
return parentStreamName;
}
public ExecutorLifecycleConfig setParentStreamName(String parentStreamName)
{
this.parentStreamName = parentStreamName;
return this;
}
}
public InputStream getParentStream()
{
if ("stdin".equals(parentStreamName)) {
return System.in;
}
else {
throw new ISE("Unknown stream name[%s]", parentStreamName);
}
}
}
......@@ -4,23 +4,22 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import java.io.File;
import java.io.InputStream;
public class ExecutorLifecycleFactory
{
private final File taskFile;
private final File statusFile;
private final InputStream parentStream;
public ExecutorLifecycleFactory(File taskFile, File statusFile, InputStream parentStream)
public ExecutorLifecycleFactory(File taskFile, File statusFile)
{
this.taskFile = taskFile;
this.statusFile = statusFile;
this.parentStream = parentStream;
}
public ExecutorLifecycle build(TaskRunner taskRunner, ObjectMapper jsonMapper)
{
return new ExecutorLifecycle(taskFile, statusFile, taskRunner, parentStream, jsonMapper);
return new ExecutorLifecycle(
new ExecutorLifecycleConfig().setTaskFile(taskFile).setStatusFile(statusFile), taskRunner, jsonMapper
);
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyConfig;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class ExecutorNode extends BaseServerNode<ExecutorNode>
{
private static final EmittingLogger log = new EmittingLogger(ExecutorNode.class);
public static Builder builder()
{
return new Builder();
}
private final Lifecycle lifecycle;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private final ExecutorLifecycleFactory executorLifecycleFactory;
private RestS3Service s3Service = null;
private MonitorScheduler monitorScheduler = null;
private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private ServiceDiscovery<Void> serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null;
private Server server = null;
private ThreadPoolTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null;
private ChatHandlerProvider chatHandlerProvider = null;
public ExecutorNode(
String nodeType,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory,
ExecutorLifecycleFactory executorLifecycleFactory
)
{
super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory);
this.lifecycle = lifecycle;
this.props = props;
this.configFactory = configFactory;
this.executorLifecycleFactory = executorLifecycleFactory;
}
@Override
public void doInit() throws Exception
{
initializeHttpClient();
initializeEmitter();
initializeS3Service();
initializeMergerConfig();
initializeServiceDiscovery();
initializeDataSegmentPusher();
initializeMonitorScheduler();
initializeTaskToolbox();
initializeTaskRunner();
initializeChatHandlerProvider();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeServer();
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
lifecycle.addManagedInstance(executorLifecycle);
final Injector injector = Guice.createInjector(
new ExecutorServletModule(
getJsonMapper(),
chatHandlerProvider
)
);
final ServletContextHandler root = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", null);
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
),
"/druid/v2/*"
);
}
private void initializeMonitorScheduler()
{
if (monitorScheduler == null)
{
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
this.monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.<Monitor>of()
);
lifecycle.addManagedInstance(monitorScheduler);
}
}
@LifecycleStart
public synchronized void start() throws Exception
{
init();
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
public void join()
{
executorLifecycle.join();
}
public ThreadPoolTaskRunner getTaskRunner()
{
return taskRunner;
}
private void initializeServer()
{
if (server == null) {
server = Initialization.makeJettyServer(null, configFactory.build(ServerConfig.class));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher)
.addValue("chatHandlerProvider", chatHandlerProvider);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes()
{
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
}
private void initializeHttpClient()
{
if (httpClient == null) {
httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
}
}
private void initializeEmitter()
{
if (emitter == null) {
emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, getJsonMapper(), lifecycle)
);
}
EmittingLogger.registerEmitter(emitter);
}
private void initializeS3Service() throws S3ServiceException
{
if (s3Service == null) {
s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
}
private void initializeMergerConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
if (workerConfig == null) {
workerConfig = configFactory.build(WorkerConfig.class);
}
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
}
}
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(
httpClient,
new IndexingServiceSelector(coordinatorServiceProvider),
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
)
),
getJsonMapper()
),
emitter,
s3Service,
segmentPusher,
dataSegmentKiller,
getAnnouncer(),
getServerView(),
getConglomerate(),
monitorScheduler,
getJsonMapper()
);
}
}
public void initializeServiceDiscovery() throws Exception
{
final CuratorConfig config = configFactory.build(CuratorConfig.class);
if (serviceDiscovery == null) {
final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle);
CuratorDiscoveryConfig discoveryConfig = getJsonConfigurator()
.configurate(getProps(), "druid.discovery.curator", CuratorDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, discoveryConfig, lifecycle
);
}
if (serviceAnnouncer == null) {
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery);
}
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getOverlordService(),
serviceDiscovery,
lifecycle
);
}
}
public void initializeTaskRunner()
{
if (taskRunner == null) {
this.taskRunner = lifecycle.addManagedInstance(new ThreadPoolTaskRunner(taskToolboxFactory));
}
}
public void initializeChatHandlerProvider()
{
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
if (config.isPublishDiscovery()) {
this.chatHandlerProvider = new EventReceivingChatHandlerProvider(null, serviceAnnouncer); // TODO: eliminate
} else {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
this.chatHandlerProvider = new NoopChatHandlerProvider();
}
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMapper(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
return this;
}
public Builder withLifecycle(Lifecycle lifecycle)
{
this.lifecycle = lifecycle;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public ExecutorNode build(String nodeType, ExecutorLifecycleFactory executorLifecycleFactory)
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
} else if (jsonMapper == null || smileMapper == null) {
throw new ISE(
"Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.",
jsonMapper,
smileMapper
);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new ExecutorNode(
nodeType,
props,
lifecycle,
jsonMapper,
smileMapper,
configFactory,
executorLifecycleFactory
);
}
}
}
package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
public class ExecutorServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final ChatHandlerProvider receivers;
public ExecutorServletModule(
ObjectMapper jsonMapper,
ChatHandlerProvider receivers
)
{
this.jsonMapper = jsonMapper;
this.receivers = receivers;
}
@Override
protected void configureServlets()
{
bind(ChatHandlerResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(ChatHandlerProvider.class).toInstance(receivers);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}
{ "type": "noop" }
\ No newline at end of file
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import org.skife.config.ConfigurationObjectFactory;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
/**
*/
public abstract class BaseServerNode<T extends QueryableNode> extends QueryableNode<T>
{
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
private DruidProcessingConfig processingConfig = null;
private QueryRunnerFactoryConglomerate conglomerate = null;
private StupidPool<ByteBuffer> computeScratchPool = null;
public BaseServerNode(
String nodeType,
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryRunnerFactoryConglomerate getConglomerate()
{
initializeQueryRunnerFactoryConglomerate();
return conglomerate;
}
public StupidPool<ByteBuffer> getComputeScratchPool()
{
initializeComputeScratchPool();
return computeScratchPool;
}
public DruidProcessingConfig getProcessingConfig()
{
initializeProcessingConfig();
return processingConfig;
}
@SuppressWarnings("unchecked")
public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
checkFieldNotSetAndSet("conglomerate", conglomerate);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setComputeScratchPool(StupidPool<ByteBuffer> computeScratchPool)
{
checkFieldNotSetAndSet("computeScratchPool", computeScratchPool);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setProcessingConfig(DruidProcessingConfig processingConfig)
{
checkFieldNotSetAndSet("processingConfig", processingConfig);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory)
{
Preconditions.checkState(
conglomerate == null,
"Registering a QueryRunnerFactory only works when a separate conglomerate is not specified."
);
Preconditions.checkState(
!additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz
);
additionalFactories.put(queryClazz, factory);
return (T) this;
}
private void initializeComputeScratchPool()
{
if (computeScratchPool == null) {
setComputeScratchPool(ServerInit.makeComputeScratchPool(getProcessingConfig()));
}
}
private void initializeQueryRunnerFactoryConglomerate()
{
if (conglomerate == null) {
final Map<Class<? extends Query>, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes(
getConfigFactory(), getComputeScratchPool()
);
for (Map.Entry<Class<? extends Query>, QueryRunnerFactory> entry : additionalFactories.entrySet()) {
factories.put(entry.getKey(), entry.getValue());
}
setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories));
}
}
private void initializeProcessingConfig()
{
if (processingConfig == null) {
setProcessingConfig(
getConfigFactory().buildWithReplacements(
DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
}
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.google.inject.util.Providers;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.master.DruidMaster;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
/**
*/
public class MasterServletModule extends JerseyServletModule
{
private final InventoryView serverInventoryView;
private final DatabaseSegmentManager segmentInventoryManager;
private final DatabaseRuleManager databaseRuleManager;
private final DruidMaster master;
private final ObjectMapper jsonMapper;
private final IndexingServiceClient indexingServiceClient;
public MasterServletModule(
InventoryView serverInventoryView,
DatabaseSegmentManager segmentInventoryManager,
DatabaseRuleManager databaseRuleManager,
DruidMaster master,
ObjectMapper jsonMapper,
IndexingServiceClient indexingServiceClient
)
{
this.serverInventoryView = serverInventoryView;
this.segmentInventoryManager = segmentInventoryManager;
this.databaseRuleManager = databaseRuleManager;
this.master = master;
this.jsonMapper = jsonMapper;
this.indexingServiceClient = indexingServiceClient;
}
@Override
protected void configureServlets()
{
bind(InfoResource.class);
bind(MasterResource.class);
bind(InventoryView.class).toInstance(serverInventoryView);
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
bind(DruidMaster.class).toInstance(master);
if (indexingServiceClient == null) {
bind(IndexingServiceClient.class).toProvider(Providers.<IndexingServiceClient>of(null));
}
else {
bind(IndexingServiceClient.class).toInstance(indexingServiceClient);
}
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusherConfig;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryConfig;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryQueryToolChest;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQueryRunnerFactory;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.utils.PropUtils;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class ServerInit
{
private static Logger log = new Logger(ServerInit.class);
public static StupidPool<ByteBuffer> makeComputeScratchPool(DruidProcessingConfig config)
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj);
} else {
long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue();
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1);
if (maxDirectMemory < memoryNeeded) {
throw new ISE(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]",
maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads()
);
}
}
}
catch (ClassNotFoundException e) {
log.info("No VM class, cannot do memory check.");
}
catch (NoSuchMethodException e) {
log.info("VM.maxDirectMemory doesn't exist, cannot do memory check.");
}
catch (InvocationTargetException e) {
log.warn(e, "static method shouldn't throw this");
}
catch (IllegalAccessException e) {
log.warn(e, "public method, shouldn't throw this");
}
return new ComputeScratchPool(config.intermediateComputeSizeBytes());
}
// TODO: Get rid of this method
public static Map<Class<? extends Query>, QueryRunnerFactory> initDefaultQueryTypes(
ConfigurationObjectFactory configFactory,
StupidPool<ByteBuffer> computationBufferPool
)
{
Map<Class<? extends Query>, QueryRunnerFactory> queryRunners = Maps.newLinkedHashMap();
queryRunners.put(TimeseriesQuery.class, new TimeseriesQueryRunnerFactory());
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
queryRunners.put(
GroupByQuery.class,
new GroupByQueryRunnerFactory(
new GroupByQueryEngine(configSupplier, computationBufferPool),
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
)
);
queryRunners.put(SearchQuery.class, new SearchQueryRunnerFactory());
queryRunners.put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory());
queryRunners.put(SegmentMetadataQuery.class, new SegmentMetadataQueryRunnerFactory());
return queryRunners;
}
public static DataSegmentPusher getSegmentPusher(
final Properties props,
final ConfigurationObjectFactory configFactory,
final ObjectMapper jsonMapper
)
{
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class);
return new CassandraDataSegmentPusher(config, jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) {
final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class);
return new HdfsDataSegmentPusher(config, new Configuration(), jsonMapper);
}
else {
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
return new S3DataSegmentPusher(s3Client, configFactory.build(S3DataSegmentPusherConfig.class), jsonMapper);
}
}
private static class ComputeScratchPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(ComputeScratchPool.class);
public ComputeScratchPool(final int computationBufferSize)
{
super(
new Supplier<ByteBuffer>()
{
final AtomicLong count = new AtomicLong(0);
@Override
public ByteBuffer get()
{
log.info(
"Allocating new computeScratchPool[%,d] of size[%,d]", count.getAndIncrement(), computationBufferSize
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
);
}
}
}
......@@ -19,7 +19,9 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
......@@ -39,11 +41,19 @@ import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.executor.ChatHandlerResource;
import com.metamx.druid.indexing.worker.executor.ExecutorLifecycle;
import com.metamx.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.metrics.MetricsModule;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import java.io.File;
import java.util.List;
/**
*/
......@@ -52,20 +62,18 @@ import io.airlift.command.Command;
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly."
)
public class CliPeon extends ServerRunnable
public class CliPeon implements Runnable
{
private static final Logger log = new Logger(CliPeon.class);
@Arguments(description = "task.json status.json", required = true)
public List<String> taskAndStatusFile;
public CliPeon()
{
super(log);
}
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String nodeType = "indexer-executor";
private static final Logger log = new Logger(CliPeon.class);
@Override
protected Injector getInjector()
{
// TODO: make it take and run a task
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
......@@ -78,7 +86,7 @@ public class CliPeon extends ServerRunnable
.addResource(ChatHandlerResource.class),
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule("real-time"),
new StorageNodeModule(nodeType),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
......@@ -86,7 +94,35 @@ public class CliPeon extends ServerRunnable
new QueryRunnerFactoryModule(),
new IndexingServiceDiscoveryModule(),
new AWSModule(),
new PeonModule()
new PeonModule(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
)
);
}
@Override
public void run()
{
try {
LogLevelAdjuster.register();
final Injector injector = getInjector();
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();
injector.getInstance(ExecutorLifecycle.class).join();
lifecycle.stop();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
......@@ -40,8 +40,8 @@ public class Main
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(
CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class,
CliOverlord.class, CliMiddleManager.class, CliPeon.class
CliCoordinator.class, CliHistorical.class, CliBroker.class,
CliRealtime.class, CliOverlord.class, CliMiddleManager.class
);
builder.withGroup("example")
......@@ -49,6 +49,11 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(CliRealtimeExample.class);
builder.withGroup("internal")
.withDescription("Processes that Druid runs \"internally\", you should rarely use these directly")
.withDefaultCommand(Help.class)
.withCommands(CliPeon.class);
final Cli<Runnable> cli = builder.build();
try {
cli.parse(args).run();
......
......@@ -40,6 +40,5 @@ public abstract class ServerRunnable implements Runnable
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册