提交 6c88659c 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

[OAP Server] Register lock implementation. (#1528)

* Use elasticsearch's document version to implement a lock for register source id.

* Return false when could not lock the document.
上级 d4590fd0
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.source.Scope;
/**
* @author peng-yongsheng
*/
public interface IRegisterLockDAO extends DAO {
boolean tryLock(Scope scope, int timeout);
void releaseLock(Scope scope);
}
......@@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine {
}
@Override public Class[] services() {
return new Class[] {IBatchDAO.class, IPersistenceDAO.class};
return new Class[] {IBatchDAO.class, IPersistenceDAO.class, IRegisterLockDAO.class};
}
}
......@@ -33,6 +33,7 @@ import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.common.settings.Settings;
......@@ -92,8 +93,7 @@ public class ElasticSearchClient implements Client {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
request.mapping(TYPE, mappingBuilder);
CreateIndexResponse response;
response = client.indices().create(request);
CreateIndexResponse response = client.indices().create(request);
logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -128,6 +128,27 @@ public class ElasticSearchClient implements Client {
return client.get(request);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
indexName = formatIndexName(indexName);
UpdateRequest request = prepareUpdate(indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
indexName = formatIndexName(indexName);
UpdateRequest request = prepareUpdate(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new IndexRequest(indexName, TYPE, id).source(source);
......
......@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.slf4j.*;
/**
......@@ -63,6 +64,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
}
@Override
......@@ -73,6 +75,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
installer.install(elasticSearchClient);
RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
lockInstaller.install();
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
private static final Logger logger = LoggerFactory.getLogger(RegisterLockDAOImpl.class);
public RegisterLockDAOImpl(ElasticSearchClient client) {
super(client);
}
@Override public boolean tryLock(Scope scope, int timeout) {
String id = String.valueOf(scope.ordinal());
try {
GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
if (response.isExists()) {
long expire = response.getField(RegisterLockIndex.COLUMN_EXPIRE).getValue();
boolean lockable = response.getField(RegisterLockIndex.COLUMN_LOCKABLE).getValue();
long version = response.getVersion();
if (lockable) {
lock(id, timeout, version);
} else if (System.currentTimeMillis() > expire) {
lock(id, timeout, version);
} else {
TimeUnit.SECONDS.sleep(1);
return false;
}
}
} catch (Throwable t) {
logger.warn("Try to lock the row with the id {} failure, error message: {}", id, t.getMessage());
return false;
}
return true;
}
private void lock(String id, int timeout, long version) throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_EXPIRE, System.currentTimeMillis() + timeout);
source.field(RegisterLockIndex.COLUMN_LOCKABLE, false);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
@Override public void releaseLock(Scope scope) {
String id = String.valueOf(scope.ordinal());
try {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source);
} catch (Throwable t) {
logger.error("Release lock failure.");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
/**
* @author peng-yongsheng
*/
public class RegisterLockIndex {
public static final String NAME = "register_lock";
public static final String COLUMN_EXPIRE = "expire";
public static final String COLUMN_LOCKABLE = "lockable";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RegisterLockInstaller {
private static final Logger logger = LoggerFactory.getLogger(RegisterLockInstaller.class);
private final ElasticSearchClient client;
public RegisterLockInstaller(ElasticSearchClient client) {
this.client = client;
}
public void install() throws StorageException {
try {
if (!client.isExistsIndex(RegisterLockIndex.NAME)) {
createIndex();
}
putIfAbsent(Scope.Endpoint.ordinal());
putIfAbsent(Scope.ServiceInstance.ordinal());
putIfAbsent(Scope.Service.ordinal());
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
}
private void createIndex() throws IOException {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "1s")
.build();
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(RegisterLockIndex.COLUMN_EXPIRE)
.field("type", "long")
.endObject()
.startObject(RegisterLockIndex.COLUMN_LOCKABLE)
.field("type", "boolean")
.endObject()
.endObject()
.endObject();
client.createIndex(RegisterLockIndex.NAME, settings, source);
}
private void putIfAbsent(int scopeId) throws IOException {
GetResponse response = client.get(RegisterLockIndex.NAME, String.valueOf(scopeId));
if (!response.isExists()) {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE);
builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
builder.endObject();
client.forceInsert(RegisterLockIndex.NAME, String.valueOf(scopeId), builder);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册