提交 bc4bc277 编写于 作者: F fjy

Merge pull request #447 from metamx/fix-configmanager

Move db specific things from ConfigManager into DbConnector
......@@ -23,7 +23,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
......@@ -31,16 +30,8 @@ import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
......@@ -57,38 +48,25 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
private final IDBI dbi;
private final DbConnector dbConnector;
private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec;
private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
private final String selectStatement;
private final String insertStatement;
private final String configTable;
private volatile ConfigManager.PollingCallable poller;
@Inject
public ConfigManager(IDBI dbi, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
public ConfigManager(DbConnector dbConnector, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.dbi = dbi;
this.dbConnector = dbConnector;
this.config = config;
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
this.watchedConfigs = Maps.newConcurrentMap();
final String configTable = dbTables.get().getConfigTable();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
this.insertStatement = String.format(
DbConnector.isPostgreSQL(dbi) ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
configTable
);
this.configTable = dbTables.get().getConfigTable();
}
@LifecycleStart
......@@ -127,7 +105,7 @@ public class ConfigManager
{
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
try {
if (entry.getValue().swapIfNew(lookup(entry.getKey()))) {
if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
log.info("New value for key[%s] seen.", entry.getKey());
}
}
......@@ -159,7 +137,7 @@ public class ConfigManager
// Multiple of these callables can be submitted at the same time, but the callables themselves
// are executed serially, so double check that it hasn't already been populated.
if (!watchedConfigs.containsKey(key)) {
byte[] value = lookup(key);
byte[] value = dbConnector.lookup(configTable, "name", "payload", key);
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
watchedConfigs.put(key, holder);
}
......@@ -187,45 +165,10 @@ public class ConfigManager
return holder.getReference();
}
public byte[] lookup(final String key)
{
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("name", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes("payload");
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj)
{
if (obj == null) {
if (obj == null || !started) {
return false;
}
......@@ -238,20 +181,7 @@ public class ConfigManager
@Override
public Boolean call() throws Exception
{
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertStatement)
.bind("name", key)
.bind("payload", newBytes)
.execute();
return null;
}
}
);
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) {
......
......@@ -21,14 +21,18 @@ package io.druid.db;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
......@@ -39,58 +43,61 @@ public class DbConnector
{
private static final Logger log = new Logger(DbConnector.class);
public static void createSegmentTable(final IDBI dbi, final String segmentTableName)
public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
{
createTable(
dbi,
segmentTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName
)
),
isPostgreSQL
);
}
public static void createRuleTable(final IDBI dbi, final String ruleTableName)
public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
{
createTable(
dbi,
ruleTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName
)
),
isPostgreSQL
);
}
public static void createConfigTable(final IDBI dbi, final String configTableName)
public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
{
createTable(
dbi,
configTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName
)
),
isPostgreSQL
);
}
public static void createTaskTable(final IDBI dbi, final String taskTableName)
public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
......@@ -112,17 +119,18 @@ public class DbConnector
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)
),
isPostgreSQL
);
}
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName)
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLogsTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
......@@ -138,17 +146,18 @@ public class DbConnector
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
),
isPostgreSQL
);
}
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName)
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLocksTableName,
String.format(
isPostgreSQL(dbi) ?
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
......@@ -164,14 +173,16 @@ public class DbConnector
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
),
isPostgreSQL
);
}
public static void createTable(
final IDBI dbi,
final String tableName,
final String sql
final String sql,
final boolean isPostgreSQL
)
{
try {
......@@ -182,7 +193,7 @@ public class DbConnector
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table;
if ( isPostgreSQL(dbi) ) {
if ( isPostgreSQL ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
......@@ -205,6 +216,84 @@ public class DbConnector
}
}
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws SQLException
{
final String insertOrUpdateStatement = String.format(
isPostgreSQL ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
return dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertOrUpdateStatement)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn, tableName, keyColumn);
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes(valueColumn);
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
......@@ -219,7 +308,7 @@ public class DbConnector
);
}
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
protected static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
......@@ -227,6 +316,7 @@ public class DbConnector
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
private boolean isPostgreSQL = false;
@Inject
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
......@@ -242,6 +332,11 @@ public class DbConnector
return dbi;
}
public boolean isPostgreSQL()
{
return isPostgreSQL;
}
private DataSource getDatasource()
{
DbConnectorConfig connectorConfig = config.get();
......@@ -249,7 +344,9 @@ public class DbConnector
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
dataSource.setUrl(connectorConfig.getConnectURI());
String uri = connectorConfig.getConnectURI();
isPostgreSQL = uri.startsWith("jdbc:postgresql");
dataSource.setUrl(uri);
if (connectorConfig.isUseValidationQuery()) {
dataSource.setValidationQuery(connectorConfig.getValidationQuery());
......@@ -262,21 +359,21 @@ public class DbConnector
public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(dbi, dbTables.get().getSegmentsTable());
createSegmentTable(dbi, dbTables.get().getSegmentsTable(), isPostgreSQL);
}
}
public void createRulesTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getRulesTable());
createRuleTable(dbi, dbTables.get().getRulesTable(), isPostgreSQL);
}
}
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createConfigTable(dbi, dbTables.get().getConfigTable());
createConfigTable(dbi, dbTables.get().getConfigTable(), isPostgreSQL);
}
}
......@@ -284,9 +381,9 @@ public class DbConnector
{
if (config.get().isCreateTables()) {
final DbTablesConfig dbTablesConfig = dbTables.get();
createTaskTable(dbi, dbTablesConfig.getTasksTable());
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable());
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable());
createTaskTable(dbi, dbTablesConfig.getTasksTable(), isPostgreSQL);
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable(), isPostgreSQL);
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable(), isPostgreSQL);
}
}
}
......@@ -66,6 +66,6 @@ public class JacksonConfigManagerModule implements Module
}
);
return new ConfigManager(dbConnector.getDBI(), dbTables, config);
return new ConfigManager(dbConnector, dbTables, config);
}
}
......@@ -60,24 +60,24 @@ public class IndexerDBCoordinator
private final ObjectMapper jsonMapper;
private final DbTablesConfig dbTables;
private final IDBI dbi;
private final DbConnector dbConnector;
@Inject
public IndexerDBCoordinator(
ObjectMapper jsonMapper,
DbTablesConfig dbTables,
IDBI dbi
DbConnector dbConnector
)
{
this.jsonMapper = jsonMapper;
this.dbTables = dbTables;
this.dbi = dbi;
this.dbConnector = dbConnector;
}
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException
{
final VersionedIntervalTimeline<String, DataSegment> timeline = dbi.withHandle(
final VersionedIntervalTimeline<String, DataSegment> timeline = dbConnector.getDBI().withHandle(
new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>()
{
@Override
......@@ -142,7 +142,7 @@ public class IndexerDBCoordinator
*/
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
{
return dbi.inTransaction(
return dbConnector.getDBI().inTransaction(
new TransactionCallback<Set<DataSegment>>()
{
@Override
......@@ -180,7 +180,7 @@ public class IndexerDBCoordinator
try {
handle.createStatement(
String.format(
DbConnector.isPostgreSQL(handle) ?
dbConnector.isPostgreSQL() ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
......@@ -200,8 +200,6 @@ public class IndexerDBCoordinator
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
......@@ -234,7 +232,7 @@ public class IndexerDBCoordinator
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
dbConnector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
......@@ -252,7 +250,7 @@ public class IndexerDBCoordinator
public void deleteSegments(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
dbConnector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
......@@ -295,7 +293,7 @@ public class IndexerDBCoordinator
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = dbi.withHandle(
List<DataSegment> matchingSegments = dbConnector.getDBI().withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
......@@ -303,7 +301,7 @@ public class IndexerDBCoordinator
{
return handle.createQuery(
String.format(
DbConnector.isPostgreSQL(handle)?
dbConnector.isPostgreSQL() ?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false",
dbTables.getSegmentsTable()
......
......@@ -74,13 +74,15 @@ public class DatabaseRuleManager
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> existing = handle.select(
String.format(
"SELECT id from %s where datasource='%s';",
ruleTable,
defaultTier
List<Map<String, Object>> existing = handle
.createQuery(
String.format(
"SELECT id from %s where datasource=:dataSource;",
ruleTable
)
)
);
.bind("dataSource", defaultTier)
.list();
if (!existing.isEmpty()) {
return null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册