diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..9c238f6ad6f9b776405ea192c85be781b6ec516b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage; + +import org.apache.skywalking.oap.server.core.source.Scope; + +/** + * @author peng-yongsheng + */ +public interface IRegisterLockDAO extends DAO { + boolean tryLock(Scope scope, int timeout); + + void releaseLock(Scope scope); +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java index 559eb5c43b918741d6baa5ba3f3ddd91d40e355f..62eb72d63ff306da84a92ea9a3c910ebb254e373 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java @@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine { } @Override public Class[] services() { - return new Class[] {IBatchDAO.class, IPersistenceDAO.class}; + return new Class[] {IBatchDAO.class, IPersistenceDAO.class, IRegisterLockDAO.class}; } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 520e3f005268d43fdd11b59db0a98ea1556f66ae..5df57f604eb1c5bbefe8cf2de458b17aaa64dcde 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.*; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; import org.elasticsearch.common.settings.Settings; @@ -92,8 +93,7 @@ public class ElasticSearchClient implements Client { CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(settings); request.mapping(TYPE, mappingBuilder); - CreateIndexResponse response; - response = client.indices().create(request); + CreateIndexResponse response = client.indices().create(request); logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } @@ -128,6 +128,27 @@ public class ElasticSearchClient implements Client { return client.get(request); } + public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { + IndexRequest request = prepareInsert(indexName, id, source); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.index(request); + } + + public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException { + indexName = formatIndexName(indexName); + UpdateRequest request = prepareUpdate(indexName, id, source); + request.version(version); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.update(request); + } + + public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { + indexName = formatIndexName(indexName); + UpdateRequest request = prepareUpdate(indexName, id, source); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.update(request); + } + public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) { indexName = formatIndexName(indexName); return new IndexRequest(indexName, TYPE, id).source(source); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index b4aa7e8eb847965fc4f313bfb1c536d8363c37fd..12565a1beadb806f050a73c08ecc099c43eef304 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.library.client.NameSpace; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*; import org.slf4j.*; /** @@ -63,6 +64,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests())); this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace)); + this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient)); } @Override @@ -73,6 +75,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber()); installer.install(elasticSearchClient); + + RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient); + lockInstaller.install(); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..9904cd4fe9b692c9dc3392e349aa0243ac6b93c0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.xcontent.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO { + + private static final Logger logger = LoggerFactory.getLogger(RegisterLockDAOImpl.class); + + public RegisterLockDAOImpl(ElasticSearchClient client) { + super(client); + } + + @Override public boolean tryLock(Scope scope, int timeout) { + String id = String.valueOf(scope.ordinal()); + try { + GetResponse response = getClient().get(RegisterLockIndex.NAME, id); + if (response.isExists()) { + long expire = response.getField(RegisterLockIndex.COLUMN_EXPIRE).getValue(); + boolean lockable = response.getField(RegisterLockIndex.COLUMN_LOCKABLE).getValue(); + long version = response.getVersion(); + + if (lockable) { + lock(id, timeout, version); + } else if (System.currentTimeMillis() > expire) { + lock(id, timeout, version); + } else { + TimeUnit.SECONDS.sleep(1); + return false; + } + } + } catch (Throwable t) { + logger.warn("Try to lock the row with the id {} failure, error message: {}", id, t.getMessage()); + return false; + } + return true; + } + + private void lock(String id, int timeout, long version) throws IOException { + XContentBuilder source = XContentFactory.jsonBuilder().startObject(); + source.field(RegisterLockIndex.COLUMN_EXPIRE, System.currentTimeMillis() + timeout); + source.field(RegisterLockIndex.COLUMN_LOCKABLE, false); + source.endObject(); + + getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version); + } + + @Override public void releaseLock(Scope scope) { + String id = String.valueOf(scope.ordinal()); + + try { + XContentBuilder source = XContentFactory.jsonBuilder().startObject(); + source.field(RegisterLockIndex.COLUMN_LOCKABLE, true); + source.endObject(); + + getClient().forceUpdate(RegisterLockIndex.NAME, id, source); + } catch (Throwable t) { + logger.error("Release lock failure."); + } + } +} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java new file mode 100644 index 0000000000000000000000000000000000000000..ee196189b0a707683667a3e3f28f9df9017026f0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock; + +/** + * @author peng-yongsheng + */ +public class RegisterLockIndex { + + public static final String NAME = "register_lock"; + public static final String COLUMN_EXPIRE = "expire"; + public static final String COLUMN_LOCKABLE = "lockable"; +} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java new file mode 100644 index 0000000000000000000000000000000000000000..dc1be5bd62fc156dfd70e5300e55ac777a45e0a6 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock; + +import java.io.IOException; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageException; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class RegisterLockInstaller { + + private static final Logger logger = LoggerFactory.getLogger(RegisterLockInstaller.class); + + private final ElasticSearchClient client; + + public RegisterLockInstaller(ElasticSearchClient client) { + this.client = client; + } + + public void install() throws StorageException { + try { + if (!client.isExistsIndex(RegisterLockIndex.NAME)) { + createIndex(); + } + putIfAbsent(Scope.Endpoint.ordinal()); + putIfAbsent(Scope.ServiceInstance.ordinal()); + putIfAbsent(Scope.Service.ordinal()); + } catch (IOException e) { + throw new StorageException(e.getMessage()); + } + } + + private void createIndex() throws IOException { + Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", "1s") + .build(); + + XContentBuilder source = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(RegisterLockIndex.COLUMN_EXPIRE) + .field("type", "long") + .endObject() + .startObject(RegisterLockIndex.COLUMN_LOCKABLE) + .field("type", "boolean") + .endObject() + .endObject() + .endObject(); + + client.createIndex(RegisterLockIndex.NAME, settings, source); + } + + private void putIfAbsent(int scopeId) throws IOException { + GetResponse response = client.get(RegisterLockIndex.NAME, String.valueOf(scopeId)); + if (!response.isExists()) { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE); + builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true); + builder.endObject(); + + client.forceInsert(RegisterLockIndex.NAME, String.valueOf(scopeId), builder); + } + } +}