From 1bc408282176363e6d98b3969be12810dfcded72 Mon Sep 17 00:00:00 2001 From: "yi.liang" Date: Fri, 27 Mar 2020 00:50:27 -0500 Subject: [PATCH] fix elasticsearch-5.x-plugin when use es6.x TransportClient error (#4517) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * up ui * up ui * fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected up elasticsearch-6.x-plugin support TransportClient * fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected up elasticsearch-6.x-plugin support TransportClient * fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected up elasticsearch-6.x-plugin support TransportClient * const string witnesses add some unit test recommended to change ; to , * const string witnesses add some unit test recommended to change ; to , * up ES6.X test-plugin * up es6.x test-plugin * up es6.x test-plugin * up es6.x test-plugin * add DeleteIndexRequest * up es6.x test-plugin * remove TransportCaseController.java * fix TransportActionNodeProxyExecuteMethodsInterceptorTest Co-authored-by: yi.liang Co-authored-by: 吴晟 Wu Sheng Co-authored-by: 梁懿 Co-authored-by: aderm <394102339@qq.com> --- .../apm/agent/core/conf/Config.java | 2 + .../plugin/elasticsearch/v5/Constants.java | 4 +- .../define/GenericActionInstrumentation.java | 7 + ...ListenableActionFutureInstrumentation.java | 7 + ...ansportActionNodeProxyInstrumentation.java | 7 + ...portClientNodesServiceInstrumentation.java | 7 + .../TransportProxyClientInstrumentation.java | 7 + .../v6/TransportAddressCache.java | 61 +++++ .../v6/TransportClientEnhanceInfo.java | 48 ++++ .../AdapterActionFutureInstrumentation.java | 76 ++++++ ...ansportActionNodeProxyInstrumentation.java | 90 +++++++ ...portClientNodesServiceInstrumentation.java | 109 ++++++++ .../TransportServiceInstrumentation.java | 73 ++++++ ...tionFutureActionGetMethodsInterceptor.java | 167 ++++++++++++ .../v6/interceptor/Constants.java | 16 ++ ...ionNodeProxyExecuteMethodsInterceptor.java | 160 +++++++++++ ...ransportClientNodesServiceInterceptor.java | 117 +++++++++ .../TransportServiceConInterceptor.java | 40 +++ .../src/main/resources/skywalking-plugin.def | 4 + ...FutureActionGetMethodsInterceptorTest.java | 142 ++++++++++ ...odeProxyExecuteMethodsInterceptorTest.java | 248 ++++++++++++++++++ .../TransportAddressCacheTest.java | 54 ++++ .../TransportServiceConInterceptorTest.java | 82 ++++++ .../java-agent/Supported-list.md | 3 +- .../config/expectedData.yaml | 136 +++++++++- .../elasticsearch-6.x-scenario/pom.xml | 8 +- .../RestHighLevelClientCase.java | 210 +++++++++++++++ .../elasticsearch/TransportClientCase.java | 101 +++++++ .../config/TransportClientConfig.java | 72 +++++ .../controller/CaseController.java | 176 +------------ 30 files changed, 2063 insertions(+), 171 deletions(-) create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java create mode 100644 test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java create mode 100644 test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java create mode 100644 test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index e518175e3c..726aa4af4c 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -256,6 +256,8 @@ public class Config { * If true, trace all the DSL(Domain Specific Language) in ElasticSearch access, default is false. */ public static boolean TRACE_DSL = false; + + public static int ELASTICSEARCH_DSL_LENGTH_THRESHOLD = 1024; } public static class Customize { diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java index dad1fb8ad7..eed12f3881 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java @@ -21,7 +21,9 @@ package org.apache.skywalking.apm.plugin.elasticsearch.v5; import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag; import org.apache.skywalking.apm.agent.core.context.tag.Tags; -class Constants { +public class Constants { + + public static final String INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS = "org.elasticsearch.common.transport.InetSocketTransportAddress"; static final String DB_TYPE = "Elasticsearch"; diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java index ec4a957bea..cc1adf3312 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants; import static net.bytebuddy.matcher.ElementMatchers.any; import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; @@ -60,4 +61,10 @@ public class GenericActionInstrumentation extends ClassEnhancePluginDefine { protected ClassMatch enhanceClass() { return byHierarchyMatch(new String[] {"org.elasticsearch.action.GenericAction"}); } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS}; + } + } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java index a653f811ac..a97befadf5 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java @@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; @@ -67,4 +68,10 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug protected ClassMatch enhanceClass() { return byName("org.elasticsearch.action.support.PlainListenableActionFuture"); } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS}; + } + } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java index d98b45764b..55526f643e 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java @@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants; import static net.bytebuddy.matcher.ElementMatchers.any; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -83,4 +84,10 @@ public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginD protected ClassMatch enhanceClass() { return byName(ENHANC_CLASS); } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS}; + } + } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java index df1027398a..6866e696ca 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java @@ -23,6 +23,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; @@ -80,4 +81,10 @@ public class TransportClientNodesServiceInstrumentation extends ClassInstanceMet protected ClassMatch enhanceClass() { return byName(ENHANCE_CLASS); } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS}; + } + } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java index 55e6e91f8a..d4c0c3f676 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java @@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants; import static net.bytebuddy.matcher.ElementMatchers.any; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; @@ -64,4 +65,10 @@ public class TransportProxyClientInstrumentation extends ClassInstanceMethodsEnh protected ClassMatch enhanceClass() { return byName("org.elasticsearch.client.transport.TransportProxyClient"); } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS}; + } + } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java new file mode 100644 index 0000000000..e61287c25b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6; + +import org.elasticsearch.common.transport.TransportAddress; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class TransportAddressCache { + + private List transportAddresses = new ArrayList(); + private String transportAddressesStr = ""; + + public synchronized void addDiscoveryNode(TransportAddress... transportAddress) { + transportAddresses.addAll(Arrays.asList(transportAddress)); + transportAddressesStr = format(); + } + + public synchronized void removeDiscoveryNode(TransportAddress transportAddress) { + List nodesBuilder = new ArrayList(); + + for (TransportAddress otherNode : transportAddresses) { + if (!otherNode.getAddress().equals(transportAddress.getAddress())) { + nodesBuilder.add(otherNode); + } + } + + transportAddresses = nodesBuilder; + transportAddressesStr = format(); + } + + private String format() { + return String.join("," + , transportAddresses.stream() + .map(x -> String.format("%s:%s", x.getAddress(), x.getPort())) + .collect(Collectors.toList()) + ); + } + + public String transportAddress() { + return transportAddressesStr; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java new file mode 100644 index 0000000000..fde111d57c --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6; + +public class TransportClientEnhanceInfo { + /** + * elasticsearch cluster name + */ + private String clusterName; + + private TransportAddressCache transportAddressCache; + + public String transportAddresses() { + return transportAddressCache.transportAddress(); + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public TransportAddressCache getTransportAddressCache() { + return transportAddressCache; + } + + public void setTransportAddressCache(TransportAddressCache transportAddressCache) { + this.transportAddressCache = transportAddressCache; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java new file mode 100644 index 0000000000..af6c92bf7d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine { + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("actionGet"); + } + + @Override + public String getMethodsInterceptor() { + return "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.AdapterActionFutureActionGetMethodsInterceptor"; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() { + return new StaticMethodsInterceptPoint[0]; + } + + @Override + protected ClassMatch enhanceClass() { + return byName("org.elasticsearch.action.support.AdapterActionFuture"); + } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES}; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java new file mode 100644 index 0000000000..0caf481de7 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants; + +import static net.bytebuddy.matcher.ElementMatchers.any; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginDefine { + + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportActionNodeProxyExecuteMethodsInterceptor"; + public static final String ENHANC_CLASS = "org.elasticsearch.action.TransportActionNodeProxy"; + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override public ElementMatcher getConstructorMatcher() { + return any(); + } + + @Override public String getConstructorInterceptor() { + return INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("execute"); + } + + @Override + public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() { + return new StaticMethodsInterceptPoint[0]; + } + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANC_CLASS); + } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES}; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java new file mode 100644 index 0000000000..35040ea62c --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class TransportClientNodesServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportClientNodesServiceInterceptor"; + public static final String ADD_TRANSPORT_ADDRESSES_INTERCEPTOR = INTERCEPTOR_CLASS + "$AddTransportAddressesInterceptor"; + public static final String REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR = INTERCEPTOR_CLASS + "$RemoveTransportAddressInterceptor"; + public static final String EXECUTE_INTERCEPTOR = INTERCEPTOR_CLASS + "$ExecuteInterceptor"; + + public static final String ENHANCE_CLASS = "org.elasticsearch.client.transport.TransportClientNodesService"; + + @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgumentWithType(1, "org.elasticsearch.transport.TransportService"); + } + + @Override + public String getConstructorInterceptor() { + return INTERCEPTOR_CLASS; + } + } + }; + } + + @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named("addTransportAddresses"); + } + + @Override public String getMethodsInterceptor() { + return ADD_TRANSPORT_ADDRESSES_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named("removeTransportAddress"); + } + + @Override public String getMethodsInterceptor() { + return REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named("execute"); + } + + @Override public String getMethodsInterceptor() { + return EXECUTE_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES}; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java new file mode 100644 index 0000000000..bd7c0ec98c --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants; + +import static net.bytebuddy.matcher.ElementMatchers.any; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class TransportServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportServiceConInterceptor"; + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return any(); + } + + @Override + public String getConstructorInterceptor() { + return ENHANCE_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[0]; + } + + @Override + public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() { + return new StaticMethodsInterceptPoint[0]; + } + + @Override + protected ClassMatch enhanceClass() { + return byName("org.elasticsearch.transport.TransportService"); + } + + @Override + protected String[] witnessClasses() { + return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES}; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java new file mode 100644 index 0000000000..7a15809c75 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.util.StringUtil; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateResponse; + +import java.lang.reflect.Method; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.ELASTICSEARCH_DSL_LENGTH_THRESHOLD; +import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL; + +public class AdapterActionFutureActionGetMethodsInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + if (!isTrace(objInst)) { + return; + } + + AbstractSpan span = ContextManager.createLocalSpan(Constants.DB_TYPE + "/" + Constants.BASE_FUTURE_METHOD); + span.setComponent(ComponentsDefine.TRANSPORT_CLIENT); + Tags.DB_TYPE.set(span, Constants.DB_TYPE); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Object ret) throws Throwable { + + if (!isTrace(objInst)) { + return ret; + } + + AbstractSpan span = ContextManager.activeSpan(); + parseResponseInfo((ActionResponse) ret, span); + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private boolean isTrace(EnhancedInstance objInst) { + + return objInst.getSkyWalkingDynamicField() != null && (boolean) objInst.getSkyWalkingDynamicField(); + + } + + private void parseResponseInfo(ActionResponse response, AbstractSpan span) { + // search response + if (response instanceof SearchResponse) { + parseSearchResponse((SearchResponse) response, span); + return; + } + // bulk response + if (response instanceof BulkResponse) { + parseBulkResponse((BulkResponse) response, span); + return; + } + // get response + if (response instanceof GetResponse) { + parseGetResponse((GetResponse) response, span); + return; + } + // index response + if (response instanceof IndexResponse) { + parseIndexResponse((IndexResponse) response, span); + return; + } + // update response + if (response instanceof UpdateResponse) { + parseUpdateResponse((UpdateResponse) response, span); + return; + } + // delete response + if (response instanceof DeleteResponse) { + parseDeleteResponse((DeleteResponse) response, span); + return; + } + } + + private void parseSearchResponse(SearchResponse searchResponse, AbstractSpan span) { + span.tag(Constants.ES_TOOK_MILLIS, Long.toString(searchResponse.getTook().getMillis())); + span.tag(Constants.ES_TOTAL_HITS, Long.toString(searchResponse.getHits().getTotalHits())); + if (TRACE_DSL) { + String tagValue = searchResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } + + private void parseBulkResponse(BulkResponse bulkResponse, AbstractSpan span) { + span.tag(Constants.ES_TOOK_MILLIS, Long.toString(bulkResponse.getTook().getMillis())); + span.tag(Constants.ES_INGEST_TOOK_MILLIS, Long.toString(bulkResponse.getIngestTookInMillis())); + if (TRACE_DSL) { + String tagValue = bulkResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } + + private void parseGetResponse(GetResponse getResponse, AbstractSpan span) { + if (TRACE_DSL) { + String tagValue = getResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } + + private void parseIndexResponse(IndexResponse indexResponse, AbstractSpan span) { + if (TRACE_DSL) { + String tagValue = indexResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } + + private void parseUpdateResponse(UpdateResponse updateResponse, AbstractSpan span) { + if (TRACE_DSL) { + String tagValue = updateResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } + + private void parseDeleteResponse(DeleteResponse deleteResponse, AbstractSpan span) { + if (TRACE_DSL) { + String tagValue = deleteResponse.toString(); + tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue; + Tags.DB_STATEMENT.set(span, tagValue); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java index 0c3c3f6f72..d8dee690e2 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java @@ -18,6 +18,9 @@ package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; +import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; + public class Constants { //interceptor class public static final String REST_HIGH_LEVEL_CLIENT_CON_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.RestHighLevelClientConInterceptor"; @@ -33,6 +36,9 @@ public class Constants { public static final String CLUSTER_CLIENT_GET_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientGetSettingsMethodsInterceptor"; public static final String CLUSTER_CLIENT_PUT_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientPutSettingsMethodsInterceptor"; + //witnessClasses + public static final String TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES = "org.elasticsearch.transport.TaskTransportChannel"; + //es operator name public static final String CREATE_OPERATOR_NAME = "Elasticsearch/CreateRequest"; public static final String DELETE_OPERATOR_NAME = "Elasticsearch/DeleteRequest"; @@ -45,4 +51,14 @@ public class Constants { public static final String CLUSTER_PUT_SETTINGS_NAME = "Elasticsearch/PutSettings"; public static final String DB_TYPE = "Elasticsearch"; + + public static final String BASE_FUTURE_METHOD = "actionGet"; + + //tags + public static final AbstractTag ES_NODE = Tags.ofKey("node.address"); + public static final AbstractTag ES_INDEX = Tags.ofKey("es.indices"); + public static final AbstractTag ES_TYPE = Tags.ofKey("es.types"); + public static final AbstractTag ES_TOOK_MILLIS = Tags.ofKey("es.took_millis"); + public static final AbstractTag ES_TOTAL_HITS = Tags.ofKey("es.total_hits"); + public static final AbstractTag ES_INGEST_TOOK_MILLIS = Tags.ofKey("es.ingest_took_millis"); } diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java new file mode 100644 index 0000000000..af9d1e6131 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo; +import org.apache.skywalking.apm.util.StringUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; + +import java.lang.reflect.Method; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL; + +public class TransportActionNodeProxyExecuteMethodsInterceptor implements InstanceConstructorInterceptor, InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + TransportClientEnhanceInfo enhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField(); + ActionRequest request = (ActionRequest) allArguments[1]; + String opType = request.getClass().getSimpleName(); + String operationName = Constants.DB_TYPE + "/" + opType; + AbstractSpan span = ContextManager.createExitSpan(operationName, enhanceInfo.transportAddresses()); + span.setComponent(ComponentsDefine.TRANSPORT_CLIENT); + Tags.DB_TYPE.set(span, Constants.DB_TYPE); + Tags.DB_INSTANCE.set(span, enhanceInfo.getClusterName()); + span.tag(Constants.ES_NODE, ((DiscoveryNode) allArguments[0]).getAddress().toString()); + parseRequestInfo(request, span); + + SpanLayer.asDB(span); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + EnhancedInstance actions = (EnhancedInstance) allArguments[2]; + objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField()); + } + + private void parseRequestInfo(ActionRequest request, AbstractSpan span) { + // search request + if (request instanceof SearchRequest) { + parseSearchRequest((SearchRequest) request, span); + return; + } + // get request + if (request instanceof GetRequest) { + parseGetRequest((GetRequest) request, span); + return; + } + // index request + if (request instanceof IndexRequest) { + parseIndexRequest((IndexRequest) request, span); + return; + } + // update request + if (request instanceof UpdateRequest) { + parseUpdateRequest((UpdateRequest) request, span); + return; + } + // delete request + if (request instanceof DeleteRequest) { + parseDeleteRequest((DeleteRequest) request, span); + return; + } + // delete index request + if (request instanceof DeleteIndexRequest) { + parseDeleteIndexRequest((DeleteIndexRequest) request, span); + return; + } + } + + private void parseSearchRequest(SearchRequest searchRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, StringUtil.join(',', searchRequest.indices())); + span.tag(Constants.ES_TYPE, StringUtil.join(',', searchRequest.types())); + if (TRACE_DSL) { + Tags.DB_STATEMENT.set(span, searchRequest.toString()); + } + } + + private void parseGetRequest(GetRequest getRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, getRequest.index()); + span.tag(Constants.ES_TYPE, getRequest.type()); + if (TRACE_DSL) { + Tags.DB_STATEMENT.set(span, getRequest.toString()); + } + } + + private void parseIndexRequest(IndexRequest indexRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, indexRequest.index()); + span.tag(Constants.ES_TYPE, indexRequest.type()); + if (TRACE_DSL) { + Tags.DB_STATEMENT.set(span, indexRequest.toString()); + } + } + + private void parseUpdateRequest(UpdateRequest updateRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, updateRequest.index()); + span.tag(Constants.ES_TYPE, updateRequest.type()); + if (TRACE_DSL) { + Tags.DB_STATEMENT.set(span, updateRequest.toString()); + } + } + + private void parseDeleteRequest(DeleteRequest deleteRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, deleteRequest.index()); + span.tag(Constants.ES_TYPE, deleteRequest.type()); + if (TRACE_DSL) { + Tags.DB_STATEMENT.set(span, deleteRequest.toString()); + } + } + + private void parseDeleteIndexRequest(DeleteIndexRequest deleteIndexRequest, AbstractSpan span) { + span.tag(Constants.ES_INDEX, String.join(",", deleteIndexRequest.indices())); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java new file mode 100644 index 0000000000..5c71fc59c3 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache; +import org.elasticsearch.action.support.AdapterActionFuture; +import org.elasticsearch.common.transport.TransportAddress; + +import java.lang.reflect.Method; + +public class TransportClientNodesServiceInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + + EnhancedInstance actions = (EnhancedInstance) allArguments[1]; + objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField()); + } + + public static class AddTransportAddressesInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField(); + TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache(); + if (transportAddressCache == null) { + transportAddressCache = new TransportAddressCache(); + transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache); + } + transportAddressCache.addDiscoveryNode((TransportAddress[]) allArguments[0]); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + + } + } + + public static class RemoveTransportAddressInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField(); + TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache(); + if (transportAddressCache == null) { + transportAddressCache = new TransportAddressCache(); + transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache); + } + transportAddressCache.removeDiscoveryNode((TransportAddress) allArguments[0]); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + + } + } + + public static class ExecuteInterceptor implements InstanceMethodsAroundInterceptor { + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + // tracking AdapterActionFuture.actionGet + if (allArguments.length >= 2 && allArguments[1] instanceof AdapterActionFuture) { + AdapterActionFuture actionFuture = (AdapterActionFuture) allArguments[1]; + ((EnhancedInstance) actionFuture).setSkyWalkingDynamicField(true); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + return null; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java new file mode 100644 index 0000000000..e4d0da50ea --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache; +import org.elasticsearch.common.settings.Settings; + +public class TransportServiceConInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + Settings settings = (Settings) allArguments[0]; + String clusterName = settings.get("cluster.name"); + + TransportClientEnhanceInfo transportClientEnhanceInfo = new TransportClientEnhanceInfo(); + transportClientEnhanceInfo.setClusterName(clusterName); + transportClientEnhanceInfo.setTransportAddressCache(new TransportAddressCache()); + + objInst.setSkyWalkingDynamicField(transportClientEnhanceInfo); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def index 9752c2205c..0a40517606 100644 --- a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def @@ -17,3 +17,7 @@ elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.RestHighLevelClientInstrumentation elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.IndicesClientInstrumentation elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.ClusterClientInstrumentation +elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.AdapterActionFutureInstrumentation +elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportServiceInstrumentation +elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportActionNodeProxyInstrumentation +elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportClientNodesServiceInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java new file mode 100644 index 0000000000..3754c41bd8 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.context.util.TagValuePair; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHits; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import java.util.List; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL; +import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class AdapterActionFutureActionGetMethodsInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private EnhancedInstance enhancedInstance; + + @Mock + private SearchResponse searchResponse; + + @Mock + private BulkResponse bulkItemResponses; + + private SearchHits searchHits; + + @Mock + private AdapterActionFutureActionGetMethodsInterceptor interceptor; + + @Before + public void setUp() { + + searchHits = new SearchHits(null, 309L, 0); + + when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(2020)); + when(searchResponse.getHits()).thenReturn(searchHits); + + when(bulkItemResponses.getTook()).thenReturn(TimeValue.timeValueMillis(2020)); + when(bulkItemResponses.getIngestTookInMillis()).thenReturn(1416L); + + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(true); + + interceptor = new AdapterActionFutureActionGetMethodsInterceptor(); + } + + @Test + public void testMethodsAround() throws Throwable { + TRACE_DSL = true; + interceptor.beforeMethod(enhancedInstance, null, null, null, null); + interceptor.afterMethod(enhancedInstance, null, null, null, searchResponse); + + List traceSegmentList = segmentStorage.getTraceSegments(); + Assert.assertThat(traceSegmentList.size(), is(1)); + TraceSegment traceSegment = traceSegmentList.get(0); + + AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0); + assertGetSpan(getSpan, searchResponse); + } + + @Test + public void testMethodsAround2() throws Throwable { + TRACE_DSL = true; + interceptor.beforeMethod(enhancedInstance, null, null, null, null); + interceptor.afterMethod(enhancedInstance, null, null, null, bulkItemResponses); + + List traceSegmentList = segmentStorage.getTraceSegments(); + Assert.assertThat(traceSegmentList.size(), is(1)); + TraceSegment traceSegment = traceSegmentList.get(0); + + AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0); + assertGetSpan(getSpan, bulkItemResponses); + } + + private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) { + assertThat(getSpan instanceof LocalSpan, is(true)); + + LocalSpan span = (LocalSpan) getSpan; + assertThat(span.getOperationName(), is("Elasticsearch/actionGet")); + assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId())); + + List tags = SpanHelper.getTags(span); + assertThat(tags.size(), is(4)); + if (ret instanceof SearchResponse) { + assertThat(tags.get(0).getValue(), is("Elasticsearch")); + assertThat(tags.get(1).getValue(), is("2020")); + assertThat(tags.get(2).getValue(), is("309")); + } else if (ret instanceof BulkResponse) { + assertThat(tags.get(0).getValue(), is("Elasticsearch")); + assertThat(tags.get(1).getValue(), is("2020")); + assertThat(tags.get(2).getValue(), is("1416")); + } + + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java new file mode 100644 index 0000000000..68c1f28c59 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.context.util.TagValuePair; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.TransportAddress; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import java.net.InetSocketAddress; +import java.util.List; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL; +import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT; +import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class TransportActionNodeProxyExecuteMethodsInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private EnhancedInstance enhancedInstance; + + @Mock + private DiscoveryNode discoveryNode; + + @Mock + private GetRequest getRequest; + + @Mock + private IndexRequest indexRequest; + + @Mock + private UpdateRequest updateRequest; + + @Mock + private DeleteRequest deleteRequest; + + @Mock + private DeleteIndexRequest deleteIndexRequest; + + @Mock + private TransportClientEnhanceInfo enhanceInfo; + + private TransportActionNodeProxyExecuteMethodsInterceptor interceptor; + + @Before + public void setUp() { + + InetSocketAddress inetSocketAddress = new InetSocketAddress("122.122.122.122", 9300); + TransportAddress transportAddress = new TransportAddress(inetSocketAddress); + when(discoveryNode.getAddress()).thenReturn(transportAddress); + + when(enhanceInfo.transportAddresses()).thenReturn("122.122.122.122:9300"); + when(enhanceInfo.getClusterName()).thenReturn("skywalking-es"); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + + when(getRequest.index()).thenReturn("endpoint"); + when(getRequest.type()).thenReturn("getType"); + + when(indexRequest.index()).thenReturn("endpoint"); + when(indexRequest.type()).thenReturn("indexType"); + + when(updateRequest.index()).thenReturn("endpoint"); + when(updateRequest.type()).thenReturn("updateType"); + + when(deleteRequest.index()).thenReturn("endpoint"); + when(deleteRequest.type()).thenReturn("deleteType"); + + when(deleteIndexRequest.indices()).thenReturn(new String[]{"endpoint"}); + + interceptor = new TransportActionNodeProxyExecuteMethodsInterceptor(); + } + + @Test + public void testConstruct() { + + final EnhancedInstance objInst1 = new EnhancedInstance() { + private Object object = null; + + @Override + public Object getSkyWalkingDynamicField() { + return object; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + this.object = value; + } + }; + + final EnhancedInstance objInst2 = new EnhancedInstance() { + private Object object = null; + + @Override + public Object getSkyWalkingDynamicField() { + return object; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + this.object = value; + } + }; + + objInst1.setSkyWalkingDynamicField(123); + Object[] allArguments = new Object[]{null, null, objInst1}; + + interceptor.onConstruct(objInst2, allArguments); + assertThat(objInst1.getSkyWalkingDynamicField(), is(objInst2.getSkyWalkingDynamicField())); + } + + @Test + public void testGetRequest() throws Throwable { + + AbstractTracingSpan getSpan = getSpan(getRequest); + assertGetSpan(getSpan, getRequest); + } + + @Test + public void testIndexRequest() throws Throwable { + + AbstractTracingSpan getSpan = getSpan(indexRequest); + assertGetSpan(getSpan, indexRequest); + } + + @Test + public void testUpdateRequest() throws Throwable { + + AbstractTracingSpan getSpan = getSpan(updateRequest); + assertGetSpan(getSpan, updateRequest); + } + + @Test + public void testDeleteRequest() throws Throwable { + + AbstractTracingSpan getSpan = getSpan(deleteRequest); + assertGetSpan(getSpan, deleteRequest); + } + + @Test + public void testDeleteIndexRequest() throws Throwable { + + AbstractTracingSpan getSpan = getSpan(deleteIndexRequest); + assertGetSpan(getSpan, deleteIndexRequest); + } + + private AbstractTracingSpan getSpan(ActionRequest actionRequest) throws Throwable { + TRACE_DSL = true; + Object[] allArguments = new Object[]{discoveryNode, actionRequest}; + + interceptor.beforeMethod(enhancedInstance, null, allArguments, null, null); + interceptor.afterMethod(enhancedInstance, null, allArguments, null, null); + + List traceSegmentList = segmentStorage.getTraceSegments(); + Assert.assertThat(traceSegmentList.size(), is(1)); + TraceSegment traceSegment = traceSegmentList.get(0); + return SegmentHelper.getSpans(traceSegment).get(0); + + } + + private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) { + assertThat(getSpan instanceof ExitSpan, is(true)); + + ExitSpan span = (ExitSpan) getSpan; + assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId())); + + List tags = SpanHelper.getTags(span); + assertThat(tags.get(0).getValue(), is("Elasticsearch")); + assertThat(tags.get(1).getValue(), is("skywalking-es")); + assertThat(tags.get(2).getValue(), is("122.122.122.122:9300")); + if (ret instanceof SearchRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/SearchRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + assertThat(tags.get(4).getValue(), is("searchType")); + } else if (ret instanceof GetRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/GetRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + assertThat(tags.get(4).getValue(), is("getType")); + } else if (ret instanceof IndexRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/IndexRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + assertThat(tags.get(4).getValue(), is("indexType")); + } else if (ret instanceof UpdateRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/UpdateRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + assertThat(tags.get(4).getValue(), is("updateType")); + } else if (ret instanceof DeleteRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + assertThat(tags.get(4).getValue(), is("deleteType")); + } else if (ret instanceof DeleteIndexRequest) { + assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteIndexRequest")); + assertThat(tags.get(3).getValue(), is("endpoint")); + } + + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java new file mode 100644 index 0000000000..ed61c3fb02 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache; +import org.elasticsearch.common.transport.TransportAddress; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class TransportAddressCacheTest { + + private TransportAddressCache transportAddressCache; + + @Before + public void setUp() { + transportAddressCache = new TransportAddressCache(); + } + + @Test + public void transportAddressTest() + throws UnknownHostException { + + transportAddressCache.addDiscoveryNode( + new TransportAddress(InetAddress.getByName("172.1.1.1"), 9300), + new TransportAddress(InetAddress.getByName("172.1.1.2"), 9200), + new TransportAddress(InetAddress.getByName("172.1.1.3"), 9100) + ); + + assertThat(transportAddressCache.transportAddress(), is("172.1.1.1:9300,172.1.1.2:9200,172.1.1.3:9100")); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java new file mode 100644 index 0000000000..96346825a2 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo; +import org.elasticsearch.common.settings.Settings; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +@PrepareForTest(value = { + Settings.class +}) +public class TransportServiceConInterceptorTest { + + @Mock + private Settings settings; + + private Object[] allArguments; + + private TransportServiceConInterceptor transportServiceConInterceptor; + + @Before + public void setUp() { + when(settings.get("cluster.name")).thenReturn("my.es.cluster"); + + allArguments = new Object[]{settings}; + } + + @Test + public void testConstruct() { + + final EnhancedInstance objInst = new EnhancedInstance() { + private Object object = null; + + @Override + public Object getSkyWalkingDynamicField() { + return object; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + this.object = value; + } + }; + + transportServiceConInterceptor = new TransportServiceConInterceptor(); + transportServiceConInterceptor.onConstruct(objInst, allArguments); + + assertThat(objInst.getSkyWalkingDynamicField() instanceof TransportClientEnhanceInfo, is(true)); + assertThat(((TransportClientEnhanceInfo) (objInst.getSkyWalkingDynamicField())).getClusterName(), is("my.es.cluster")); + } + +} diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md index 1468fe3dd3..735f185401 100644 --- a/docs/en/setup/service-agent/java-agent/Supported-list.md +++ b/docs/en/setup/service-agent/java-agent/Supported-list.md @@ -59,7 +59,8 @@ * [Spymemcached](https://github.com/couchbase/spymemcached) 2.x * [Xmemcached](https://github.com/killme2008/xmemcached) 2.x * [Elasticsearch](https://github.com/elastic/elasticsearch) - * [transport-client](https://github.com/elastic/elasticsearch/tree/master/client/transport) 5.2.x-5.6.x + * [transport-client](https://github.com/elastic/elasticsearch/tree/v5.2.0/client/transport) 5.2.x-5.6.x + * [transport-client](https://github.com/elastic/elasticsearch/tree/v6.7.1/client/transport) 6.7.1-6.8.4 * [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/index.html) 6.7.1-6.8.4 * [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.0/java-rest-high.html) 7.0.0-7.5.2 * [Solr](https://github.com/apache/lucene-solr/) diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml b/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml index 9a2c16b272..745377f5a1 100644 --- a/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml @@ -128,6 +128,140 @@ segmentItems: tags: - {key: db.type, value: Elasticsearch} - {key: db.instance, value: not null} + - operationName: Elasticsearch/IndexRequest + operationId: 0 + parentSpanId: 0 + spanId: 7 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} + - {key: es.types, value: not null} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/actionGet + operationId: 0 + parentSpanId: 0 + spanId: 8 + spanLayer: null + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Local + peer: null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/GetRequest + operationId: 0 + parentSpanId: 0 + spanId: 9 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} + - {key: es.types, value: not null} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/SearchRequest + operationId: 0 + parentSpanId: 0 + spanId: 10 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} + - {key: es.types, value: not null} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/UpdateRequest + operationId: 0 + parentSpanId: 0 + spanId: 11 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} + - {key: es.types, value: not null} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/DeleteRequest + operationId: 0 + parentSpanId: 0 + spanId: 12 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} + - {key: es.types, value: not null} + - {key: db.statement, value: not null} + - operationName: Elasticsearch/DeleteIndexRequest + operationId: 0 + parentSpanId: 0 + spanId: 13 + spanLayer: Database + startTime: nq 0 + endTime: nq 0 + componentId: 48 + componentName: '' + isError: false + spanType: Exit + peer: not null + peerId: 0 + tags: + - {key: db.type, value: Elasticsearch} + - {key: db.instance, value: not null} + - {key: node.address, value: not null} + - {key: es.indices, value: not null} - operationName: /elasticsearch-case/case/elasticsearch operationId: 0 parentSpanId: -1 @@ -143,4 +277,4 @@ segmentItems: peerId: 0 tags: - {key: url, value: 'http://localhost:8080/elasticsearch-case/case/elasticsearch'} - - {key: http.method, value: GET} + - {key: http.method, value: GET} \ No newline at end of file diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml b/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml index f1fb3377b9..eb08d29f44 100644 --- a/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml @@ -31,7 +31,7 @@ UTF-8 1.8 - 6.7.1 + 6.8.6 ${test.framework.version} 2.1.4.RELEASE @@ -56,6 +56,12 @@ spring-boot-starter-web ${spring-boot.version} + + + org.elasticsearch.client + transport + ${test.framework.version} + org.elasticsearch.client diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java new file mode 100644 index 0000000000..008261b010 --- /dev/null +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.testcase.elasticsearch; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +import static java.util.Collections.singletonMap; + +@Component +public class RestHighLevelClientCase { + + private static Logger logger = LogManager.getLogger(RestHighLevelClientCase.class); + + @Autowired + private RestHighLevelClient client; + + public boolean healthcheck() throws Exception { + ClusterHealthRequest request = new ClusterHealthRequest(); + request.timeout(TimeValue.timeValueSeconds(10)); + request.waitForStatus(ClusterHealthStatus.GREEN); + + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + if (response.isTimedOut()) { + String message = "elastic search node start fail!"; + logger.error(message); + throw new RuntimeException(message); + } + return true; + } + + public boolean elasticsearch() throws Exception { + String indexName = UUID.randomUUID().toString(); + try { + //create + createIndex(client, indexName); + // index + index(client, indexName); + + client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT); + + //get + get(client, indexName); + // search + search(client, indexName); + // update + update(client, indexName); + // delete + delete(client, indexName); + } finally { + if (null != client) { + client.close(); + } + } + return true; + } + + private void createIndex(RestHighLevelClient client, String indexName) throws IOException { + CreateIndexRequest request = new CreateIndexRequest(indexName); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + builder.startObject("properties"); + { + builder.startObject("author"); + { + builder.field("type", "keyword"); + } + builder.endObject(); + builder.startObject("title"); + { + builder.field("type", "keyword"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + request.mapping(builder); + + request.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); + + CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); + if (createIndexResponse.isAcknowledged() == false) { + String message = "elasticsearch create index fail."; + logger.error(message); + throw new RuntimeException(message); + } + } + + private void index(RestHighLevelClient client, String indexName) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + builder.field("author", "Marker"); + builder.field("title", "Java programing."); + } + builder.endObject(); + IndexRequest indexRequest = new IndexRequest(indexName, "_doc", "1").source(builder); + + IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); + if (indexResponse.status().getStatus() >= 400) { + String message = "elasticsearch index data fail."; + logger.error(message); + throw new RuntimeException(message); + } + } + + private void get(RestHighLevelClient client, String indexName) throws IOException { + GetRequest getRequest = new GetRequest(indexName, "_doc", "1"); + GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); + + if (!getResponse.isExists()) { + String message = "elasticsearch get data fail."; + logger.error(message); + throw new RuntimeException(message); + } + } + + private void update(RestHighLevelClient client, String indexName) throws IOException { + UpdateRequest request = new UpdateRequest(indexName, "_doc", "1"); + Map parameters = singletonMap("title", "c++ programing."); + Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.title = params.title", parameters); + request.script(inline); + + UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); + if (updateResponse.getVersion() != 2) { + String message = "elasticsearch update data fail."; + logger.error(message); + throw new RuntimeException(message); + } + } + + private void delete(RestHighLevelClient client, String indexName) throws IOException { + DeleteIndexRequest request = new DeleteIndexRequest(indexName); + AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); + if (!deleteIndexResponse.isAcknowledged()) { + String message = "elasticsearch delete index fail."; + logger.error(message); + throw new RuntimeException(message); + } + } + + private void search(RestHighLevelClient client, String indexName) throws IOException { + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(QueryBuilders.termQuery("author", "Marker")); + sourceBuilder.from(0); + sourceBuilder.size(10); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(indexName); + searchRequest.source(sourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + if (!(searchResponse.getHits().totalHits > 0)) { + String message = "elasticsearch search data fail."; + logger.error(message); + throw new RuntimeException(message); + } + } +} diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java new file mode 100644 index 0000000000..9e2e6b5b0f --- /dev/null +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.testcase.elasticsearch; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.UUID; + +@Component +public class TransportClientCase { + + private static Logger logger = LogManager.getLogger(TransportClientCase.class); + + @Autowired + private TransportClient client; + + public boolean elasticsearch() throws Exception { + String indexName = UUID.randomUUID().toString(); + try { + // create + index(client, indexName); + // get + get(client, indexName); + // search + search(client, indexName); + // update + update(client, indexName); + // delete + delete(client, indexName); + // remove index + client.admin().indices().prepareDelete(indexName).execute(); + } finally { + if (null != client) { + client.close(); + } + } + return true; + } + + private void index(Client client, String indexName) throws IOException { + try { + client.prepareIndex(indexName, "test", "1") + .setSource(XContentFactory.jsonBuilder() + .startObject() + .field("name", "mysql innodb") + .field("price", "0") + .field("language", "chinese") + .endObject()) + .get(); + } catch (IOException e) { + logger.error("index document error.", e); + throw e; + } + } + + private void get(Client client, String indexName) { + client.prepareGet().setIndex(indexName).setId("1").execute(); + } + + private void update(Client client, String indexName) throws IOException { + try { + client.prepareUpdate(indexName, "test", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("price", "9.9").endObject()) + .execute(); + } catch (IOException e) { + logger.error("update document error.", e); + throw e; + } + } + + private void delete(Client client, String indexName) { + client.prepareDelete(indexName, "test", "1").execute(); + } + + private void search(Client client, String indexName) { + client.prepareSearch(indexName).setTypes("test").setSize(10).execute(); + } +} diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java new file mode 100644 index 0000000000..727a3edb19 --- /dev/null +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.testcase.elasticsearch.config; + +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.net.InetAddress; + +@Configuration +public class TransportClientConfig { + + @Value("${elasticsearch.server}") + private String elasticsearchHost; + + public final static Integer PORT = 9300; //端口 + + @Bean + public TransportClient getESClientConnection() + throws Exception { + + TransportClient client = null; + Settings settings = Settings.builder() + .put("cluster.name", "docker-node") + .put("client.transport.sniff", false) + .build(); + + client = new PreBuiltTransportClient(settings); + for (TransportAddress i : parseEsHost()) { + client.addTransportAddress(i); + } + return client; + } + + private TransportAddress[] parseEsHost() + throws Exception { + TransportAddress[] transportAddresses = null; + if (!elasticsearchHost.isEmpty()) { + String[] hostIp = elasticsearchHost.split(","); + transportAddresses = new TransportAddress[hostIp.length]; + + for (int i = 0; i < hostIp.length; ++i) { + String[] hostIpItem = hostIp[i].split(":"); + String ip = hostIpItem[0].trim(); + String port = hostIpItem[1].trim(); + transportAddresses[i] = new TransportAddress(InetAddress.getByName(ip), PORT); + } + } + return transportAddresses; + } +} diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java index d015a07f15..e3928c7341 100644 --- a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java +++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java @@ -18,44 +18,15 @@ package org.apache.skywalking.apm.testcase.elasticsearch.controller; -import java.io.IOException; -import java.util.Map; -import java.util.UUID; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; -import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.apache.skywalking.apm.testcase.elasticsearch.RestHighLevelClientCase; +import org.apache.skywalking.apm.testcase.elasticsearch.TransportClientCase; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import static java.util.Collections.singletonMap; - @RestController @RequestMapping("/elasticsearch-case/case") public class CaseController { @@ -63,153 +34,24 @@ public class CaseController { private static Logger logger = LogManager.getLogger(CaseController.class); @Autowired - private RestHighLevelClient client; + private RestHighLevelClientCase restHighLevelClientCase; + + @Autowired + private TransportClientCase transportClientCase; @GetMapping("/healthcheck") public String healthcheck() throws Exception { - ClusterHealthRequest request = new ClusterHealthRequest(); - request.timeout(TimeValue.timeValueSeconds(10)); - request.waitForStatus(ClusterHealthStatus.GREEN); - - ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); - if (response.isTimedOut()) { - String message = "elastic search node start fail!"; - logger.error(message); - throw new RuntimeException(message); - } + restHighLevelClientCase.healthcheck(); return "Success"; } @GetMapping("/elasticsearch") public String elasticsearch() throws Exception { - String indexName = UUID.randomUUID().toString(); - try { - //create - createIndex(client, indexName); - // index - index(client, indexName); - - client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT); + restHighLevelClientCase.elasticsearch(); + transportClientCase.elasticsearch(); - //get - get(client, indexName); - // search - search(client, indexName); - // update - update(client, indexName); - // delete - delete(client, indexName); - } finally { - if (null != client) { - client.close(); - } - } return "Success"; } - private void createIndex(RestHighLevelClient client, String indexName) throws IOException { - CreateIndexRequest request = new CreateIndexRequest(indexName); - - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - { - builder.startObject("properties"); - { - builder.startObject("author"); - { - builder.field("type", "keyword"); - } - builder.endObject(); - builder.startObject("title"); - { - builder.field("type", "keyword"); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - request.mapping(builder); - - request.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); - - CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); - if (createIndexResponse.isAcknowledged() == false) { - String message = "elasticsearch create index fail."; - logger.error(message); - throw new RuntimeException(message); - } - } - - private void index(RestHighLevelClient client, String indexName) throws IOException { - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - { - builder.field("author", "Marker"); - builder.field("title", "Java programing."); - } - builder.endObject(); - IndexRequest indexRequest = new IndexRequest(indexName, "_doc", "1").source(builder); - - IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); - if (indexResponse.status().getStatus() >= 400) { - String message = "elasticsearch index data fail."; - logger.error(message); - throw new RuntimeException(message); - } - } - - private void get(RestHighLevelClient client, String indexName) throws IOException { - GetRequest getRequest = new GetRequest(indexName, "_doc", "1"); - GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); - - if (!getResponse.isExists()) { - String message = "elasticsearch get data fail."; - logger.error(message); - throw new RuntimeException(message); - } - } - - private void update(RestHighLevelClient client, String indexName) throws IOException { - UpdateRequest request = new UpdateRequest(indexName, "_doc", "1"); - Map parameters = singletonMap("title", "c++ programing."); - Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.title = params.title", parameters); - request.script(inline); - - UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); - if (updateResponse.getVersion() != 2) { - String message = "elasticsearch update data fail."; - logger.error(message); - throw new RuntimeException(message); - } - } - - private void delete(RestHighLevelClient client, String indexName) throws IOException { - DeleteIndexRequest request = new DeleteIndexRequest(indexName); - AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); - if (!deleteIndexResponse.isAcknowledged()) { - String message = "elasticsearch delete index fail."; - logger.error(message); - throw new RuntimeException(message); - } - } - - private void search(RestHighLevelClient client, String indexName) throws IOException { - - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.query(QueryBuilders.termQuery("author", "Marker")); - sourceBuilder.from(0); - sourceBuilder.size(10); - - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(indexName); - searchRequest.source(sourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - if (!(searchResponse.getHits().totalHits > 0)) { - String message = "elasticsearch search data fail."; - logger.error(message); - throw new RuntimeException(message); - } - } } -- GitLab