提交 2e0104e5 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Register lock refactor (#2195)

* Refactor register and reduce the lock time.

* Refactor es id generation mechanism.

* Wrong commit place.
上级 c704ee0f
......@@ -78,7 +78,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
}
} else {
int sequence;
if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
if ((sequence = registerLockDAO.getId(scope, registerSource)) != Const.NONE) {
try {
dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
......@@ -91,8 +91,6 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} finally {
registerLockDAO.releaseLock(scope);
}
} else {
logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
......
......@@ -18,14 +18,22 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
/**
* @author peng-yongsheng
* Entity register and ID generator.
*
* @author peng-yongsheng, wusheng
*/
public interface IRegisterLockDAO extends DAO {
int tryLockAndIncrement(Scope scope);
void releaseLock(Scope scope);
/**
* This method is also executed by one thread in each oap instance, but in cluster environment, it could be executed
* in concurrent way, so no `sync` in method level, but the implementation must make sure the return id is unique no
* matter the cluster size.
*
* @param scope for the id. IDs at different scopes could be same, but unique in same scope.
* @return Unique ID.
*/
int getId(Scope scope, RegisterSource registerSource);
}
......@@ -66,7 +66,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient, 10 * 60 * 1000));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
......
......@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -37,14 +37,11 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
private static final Logger logger = LoggerFactory.getLogger(RegisterLockDAOImpl.class);
private final int timeout;
public RegisterLockDAOImpl(ElasticSearchClient client, int timeout) {
public RegisterLockDAOImpl(ElasticSearchClient client) {
super(client);
this.timeout = timeout;
}
@Override public int tryLockAndIncrement(Scope scope) {
@Override public int getId(Scope scope, RegisterSource registerSource) {
String id = String.valueOf(scope.ordinal());
int sequence = Const.NONE;
......@@ -53,19 +50,12 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
if (response.isExists()) {
Map<String, Object> source = response.getSource();
long expire = ((Number)source.get(RegisterLockIndex.COLUMN_EXPIRE)).longValue();
boolean lockable = (boolean)source.get(RegisterLockIndex.COLUMN_LOCKABLE);
sequence = ((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
long version = response.getVersion();
sequence++;
if (lockable || System.currentTimeMillis() > expire) {
lock(id, sequence, timeout, version);
} else {
TimeUnit.SECONDS.sleep(1);
return Const.NONE;
}
lock(id, sequence, version);
}
} catch (Throwable t) {
logger.warn("Try to lock the row with the id {} failure, error message: {}", id, t.getMessage());
......@@ -74,27 +64,13 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
return sequence;
}
private void lock(String id, int sequence, int timeout, long version) throws IOException {
private void lock(String id, int sequence, long version) throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_EXPIRE, System.currentTimeMillis() + timeout);
source.field(RegisterLockIndex.COLUMN_LOCKABLE, false);
source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
}
@Override public void releaseLock(Scope scope) {
String id = String.valueOf(scope.ordinal());
try {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source);
} catch (Throwable t) {
logger.error("{} inventory release lock failure.", scope.name(), t);
}
}
}
......@@ -24,7 +24,5 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
public class RegisterLockIndex {
public static final String NAME = "register_lock";
public static final String COLUMN_EXPIRE = "expire";
public static final String COLUMN_LOCKABLE = "lockable";
public static final String COLUMN_SEQUENCE = "sequence";
}
......@@ -78,12 +78,6 @@ public class RegisterLockInstaller {
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(RegisterLockIndex.COLUMN_EXPIRE)
.field("type", "long")
.endObject()
.startObject(RegisterLockIndex.COLUMN_LOCKABLE)
.field("type", "boolean")
.endObject()
.startObject(RegisterLockIndex.COLUMN_SEQUENCE)
.field("type", "integer")
.endObject()
......@@ -97,8 +91,6 @@ public class RegisterLockInstaller {
GetResponse response = client.get(RegisterLockIndex.NAME, String.valueOf(scopeId));
if (!response.isExists()) {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE);
builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
builder.field(RegisterLockIndex.COLUMN_SEQUENCE, 1);
builder.endObject();
......
......@@ -19,8 +19,8 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
......@@ -37,51 +37,26 @@ public class H2RegisterLockDAO implements IRegisterLockDAO {
private static final Logger logger = LoggerFactory.getLogger(H2RegisterLockDAO.class);
private JDBCHikariCPClient h2Client;
private Map<Scope, Connection> onLockingConnection;
public H2RegisterLockDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
onLockingConnection = new HashMap<>();
}
void init(Scope scope) {
if (!onLockingConnection.containsKey(scope)) {
onLockingConnection.put(scope, null);
}
}
@Override public int tryLockAndIncrement(Scope scope) {
if (onLockingConnection.containsKey(scope)) {
try {
Connection connection = h2Client.getTransactionConnection();
onLockingConnection.put(scope, connection);
ResultSet resultSet = h2Client.executeQuery(connection, "select sequence from " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " where id = " + scope.ordinal() + " for update");
while (resultSet.next()) {
int sequence = resultSet.getInt("sequence");
sequence++;
h2Client.execute(connection, "update " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " set sequence = " + sequence + " where id = " + scope.ordinal());
return sequence;
}
} catch (JDBCClientException | SQLException e) {
logger.error("try inventory register lock for scope id={} name={} failure.", scope.ordinal(), scope.name());
logger.error("tryLock error", e);
return Const.NONE;
}
}
return Const.NONE;
}
@Override public void releaseLock(Scope scope) {
Connection connection = onLockingConnection.get(scope);
if (connection != null) {
try {
@Override public int getId(Scope scope, RegisterSource registerSource) {
try (Connection connection = h2Client.getTransactionConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, "select sequence from " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " where id = " + scope.ordinal() + " for update");
while (resultSet.next()) {
int sequence = resultSet.getInt("sequence");
sequence++;
h2Client.execute(connection, "update " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " set sequence = " + sequence + " where id = " + scope.ordinal());
connection.commit();
connection.close();
} catch (SQLException e) {
logger.error("release lock failure.", e);
} finally {
onLockingConnection.put(scope, null);
return sequence;
}
} catch (JDBCClientException | SQLException e) {
logger.error("try inventory register lock for scope id={} name={} failure.", scope.ordinal(), scope.name());
logger.error("tryLock error", e);
return Const.NONE;
}
return Const.NONE;
}
}
......@@ -61,7 +61,6 @@ public class H2RegisterLockInstaller {
for (Class registerSource : InventoryProcess.INSTANCE.getAllRegisterSources()) {
Scope sourceScope = StorageEntityAnnotationUtils.getSourceScope(registerSource);
dao.init(sourceScope);
putIfAbsent(h2Client, connection, sourceScope.ordinal(), sourceScope.name());
}
} catch (JDBCClientException | SQLException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册