未验证 提交 1bc40828 编写于 作者: Indifer's avatar Indifer 提交者: GitHub

fix elasticsearch-5.x-plugin when use es6.x TransportClient error (#4517)

* up ui

* up ui

* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient

* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient

* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient

* const string witnesses
add some unit test
recommended to change ; to ,

* const string witnesses
add some unit test
recommended to change ; to ,

* up ES6.X test-plugin

* up es6.x test-plugin

* up es6.x test-plugin

* up es6.x test-plugin

* add DeleteIndexRequest

* up es6.x test-plugin

* remove TransportCaseController.java

* fix TransportActionNodeProxyExecuteMethodsInterceptorTest
Co-authored-by: Nyi.liang <yi.liang@zhangmen.cn>
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
Co-authored-by: N梁懿 <liangyi@xforceplus.com>
Co-authored-by: Naderm <394102339@qq.com>
上级 20309744
......@@ -256,6 +256,8 @@ public class Config {
* If true, trace all the DSL(Domain Specific Language) in ElasticSearch access, default is false.
*/
public static boolean TRACE_DSL = false;
public static int ELASTICSEARCH_DSL_LENGTH_THRESHOLD = 1024;
}
public static class Customize {
......
......@@ -21,7 +21,9 @@ package org.apache.skywalking.apm.plugin.elasticsearch.v5;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
class Constants {
public class Constants {
public static final String INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS = "org.elasticsearch.common.transport.InetSocketTransportAddress";
static final String DB_TYPE = "Elasticsearch";
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
......@@ -60,4 +61,10 @@ public class GenericActionInstrumentation extends ClassEnhancePluginDefine {
protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {"org.elasticsearch.action.GenericAction"});
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
}
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
......@@ -67,4 +68,10 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.action.support.PlainListenableActionFuture");
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
}
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
......@@ -83,4 +84,10 @@ public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginD
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
}
}
......@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
......@@ -80,4 +81,10 @@ public class TransportClientNodesServiceInstrumentation extends ClassInstanceMet
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
}
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
......@@ -64,4 +65,10 @@ public class TransportProxyClientInstrumentation extends ClassInstanceMethodsEnh
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.client.transport.TransportProxyClient");
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6;
import org.elasticsearch.common.transport.TransportAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class TransportAddressCache {
private List<TransportAddress> transportAddresses = new ArrayList<TransportAddress>();
private String transportAddressesStr = "";
public synchronized void addDiscoveryNode(TransportAddress... transportAddress) {
transportAddresses.addAll(Arrays.asList(transportAddress));
transportAddressesStr = format();
}
public synchronized void removeDiscoveryNode(TransportAddress transportAddress) {
List<TransportAddress> nodesBuilder = new ArrayList<TransportAddress>();
for (TransportAddress otherNode : transportAddresses) {
if (!otherNode.getAddress().equals(transportAddress.getAddress())) {
nodesBuilder.add(otherNode);
}
}
transportAddresses = nodesBuilder;
transportAddressesStr = format();
}
private String format() {
return String.join(","
, transportAddresses.stream()
.map(x -> String.format("%s:%s", x.getAddress(), x.getPort()))
.collect(Collectors.toList())
);
}
public String transportAddress() {
return transportAddressesStr;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6;
public class TransportClientEnhanceInfo {
/**
* elasticsearch cluster name
*/
private String clusterName;
private TransportAddressCache transportAddressCache;
public String transportAddresses() {
return transportAddressCache.transportAddress();
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public TransportAddressCache getTransportAddressCache() {
return transportAddressCache;
}
public void setTransportAddressCache(TransportAddressCache transportAddressCache) {
this.transportAddressCache = transportAddressCache;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("actionGet");
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.AdapterActionFutureActionGetMethodsInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.action.support.AdapterActionFuture");
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportActionNodeProxyExecuteMethodsInterceptor";
public static final String ENHANC_CLASS = "org.elasticsearch.action.TransportActionNodeProxy";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override public String getConstructorInterceptor() {
return INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class TransportClientNodesServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportClientNodesServiceInterceptor";
public static final String ADD_TRANSPORT_ADDRESSES_INTERCEPTOR = INTERCEPTOR_CLASS + "$AddTransportAddressesInterceptor";
public static final String REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR = INTERCEPTOR_CLASS + "$RemoveTransportAddressInterceptor";
public static final String EXECUTE_INTERCEPTOR = INTERCEPTOR_CLASS + "$ExecuteInterceptor";
public static final String ENHANCE_CLASS = "org.elasticsearch.client.transport.TransportClientNodesService";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(1, "org.elasticsearch.transport.TransportService");
}
@Override
public String getConstructorInterceptor() {
return INTERCEPTOR_CLASS;
}
}
};
}
@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("addTransportAddresses");
}
@Override public String getMethodsInterceptor() {
return ADD_TRANSPORT_ADDRESSES_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("removeTransportAddress");
}
@Override public String getMethodsInterceptor() {
return REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute");
}
@Override public String getMethodsInterceptor() {
return EXECUTE_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class TransportServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportServiceConInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return ENHANCE_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.transport.TransportService");
}
@Override
protected String[] witnessClasses() {
return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateResponse;
import java.lang.reflect.Method;
import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.ELASTICSEARCH_DSL_LENGTH_THRESHOLD;
import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
public class AdapterActionFutureActionGetMethodsInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
if (!isTrace(objInst)) {
return;
}
AbstractSpan span = ContextManager.createLocalSpan(Constants.DB_TYPE + "/" + Constants.BASE_FUTURE_METHOD);
span.setComponent(ComponentsDefine.TRANSPORT_CLIENT);
Tags.DB_TYPE.set(span, Constants.DB_TYPE);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
if (!isTrace(objInst)) {
return ret;
}
AbstractSpan span = ContextManager.activeSpan();
parseResponseInfo((ActionResponse) ret, span);
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
private boolean isTrace(EnhancedInstance objInst) {
return objInst.getSkyWalkingDynamicField() != null && (boolean) objInst.getSkyWalkingDynamicField();
}
private void parseResponseInfo(ActionResponse response, AbstractSpan span) {
// search response
if (response instanceof SearchResponse) {
parseSearchResponse((SearchResponse) response, span);
return;
}
// bulk response
if (response instanceof BulkResponse) {
parseBulkResponse((BulkResponse) response, span);
return;
}
// get response
if (response instanceof GetResponse) {
parseGetResponse((GetResponse) response, span);
return;
}
// index response
if (response instanceof IndexResponse) {
parseIndexResponse((IndexResponse) response, span);
return;
}
// update response
if (response instanceof UpdateResponse) {
parseUpdateResponse((UpdateResponse) response, span);
return;
}
// delete response
if (response instanceof DeleteResponse) {
parseDeleteResponse((DeleteResponse) response, span);
return;
}
}
private void parseSearchResponse(SearchResponse searchResponse, AbstractSpan span) {
span.tag(Constants.ES_TOOK_MILLIS, Long.toString(searchResponse.getTook().getMillis()));
span.tag(Constants.ES_TOTAL_HITS, Long.toString(searchResponse.getHits().getTotalHits()));
if (TRACE_DSL) {
String tagValue = searchResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
private void parseBulkResponse(BulkResponse bulkResponse, AbstractSpan span) {
span.tag(Constants.ES_TOOK_MILLIS, Long.toString(bulkResponse.getTook().getMillis()));
span.tag(Constants.ES_INGEST_TOOK_MILLIS, Long.toString(bulkResponse.getIngestTookInMillis()));
if (TRACE_DSL) {
String tagValue = bulkResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
private void parseGetResponse(GetResponse getResponse, AbstractSpan span) {
if (TRACE_DSL) {
String tagValue = getResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
private void parseIndexResponse(IndexResponse indexResponse, AbstractSpan span) {
if (TRACE_DSL) {
String tagValue = indexResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
private void parseUpdateResponse(UpdateResponse updateResponse, AbstractSpan span) {
if (TRACE_DSL) {
String tagValue = updateResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
private void parseDeleteResponse(DeleteResponse deleteResponse, AbstractSpan span) {
if (TRACE_DSL) {
String tagValue = deleteResponse.toString();
tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
Tags.DB_STATEMENT.set(span, tagValue);
}
}
}
......@@ -18,6 +18,9 @@
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
public class Constants {
//interceptor class
public static final String REST_HIGH_LEVEL_CLIENT_CON_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.RestHighLevelClientConInterceptor";
......@@ -33,6 +36,9 @@ public class Constants {
public static final String CLUSTER_CLIENT_GET_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientGetSettingsMethodsInterceptor";
public static final String CLUSTER_CLIENT_PUT_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientPutSettingsMethodsInterceptor";
//witnessClasses
public static final String TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES = "org.elasticsearch.transport.TaskTransportChannel";
//es operator name
public static final String CREATE_OPERATOR_NAME = "Elasticsearch/CreateRequest";
public static final String DELETE_OPERATOR_NAME = "Elasticsearch/DeleteRequest";
......@@ -45,4 +51,14 @@ public class Constants {
public static final String CLUSTER_PUT_SETTINGS_NAME = "Elasticsearch/PutSettings";
public static final String DB_TYPE = "Elasticsearch";
public static final String BASE_FUTURE_METHOD = "actionGet";
//tags
public static final AbstractTag<String> ES_NODE = Tags.ofKey("node.address");
public static final AbstractTag<String> ES_INDEX = Tags.ofKey("es.indices");
public static final AbstractTag<String> ES_TYPE = Tags.ofKey("es.types");
public static final AbstractTag<String> ES_TOOK_MILLIS = Tags.ofKey("es.took_millis");
public static final AbstractTag<String> ES_TOTAL_HITS = Tags.ofKey("es.total_hits");
public static final AbstractTag<String> ES_INGEST_TOOK_MILLIS = Tags.ofKey("es.ingest_took_millis");
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
import org.apache.skywalking.apm.util.StringUtil;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.lang.reflect.Method;
import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
public class TransportActionNodeProxyExecuteMethodsInterceptor implements InstanceConstructorInterceptor, InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
TransportClientEnhanceInfo enhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
ActionRequest request = (ActionRequest) allArguments[1];
String opType = request.getClass().getSimpleName();
String operationName = Constants.DB_TYPE + "/" + opType;
AbstractSpan span = ContextManager.createExitSpan(operationName, enhanceInfo.transportAddresses());
span.setComponent(ComponentsDefine.TRANSPORT_CLIENT);
Tags.DB_TYPE.set(span, Constants.DB_TYPE);
Tags.DB_INSTANCE.set(span, enhanceInfo.getClusterName());
span.tag(Constants.ES_NODE, ((DiscoveryNode) allArguments[0]).getAddress().toString());
parseRequestInfo(request, span);
SpanLayer.asDB(span);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
EnhancedInstance actions = (EnhancedInstance) allArguments[2];
objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField());
}
private void parseRequestInfo(ActionRequest request, AbstractSpan span) {
// search request
if (request instanceof SearchRequest) {
parseSearchRequest((SearchRequest) request, span);
return;
}
// get request
if (request instanceof GetRequest) {
parseGetRequest((GetRequest) request, span);
return;
}
// index request
if (request instanceof IndexRequest) {
parseIndexRequest((IndexRequest) request, span);
return;
}
// update request
if (request instanceof UpdateRequest) {
parseUpdateRequest((UpdateRequest) request, span);
return;
}
// delete request
if (request instanceof DeleteRequest) {
parseDeleteRequest((DeleteRequest) request, span);
return;
}
// delete index request
if (request instanceof DeleteIndexRequest) {
parseDeleteIndexRequest((DeleteIndexRequest) request, span);
return;
}
}
private void parseSearchRequest(SearchRequest searchRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, StringUtil.join(',', searchRequest.indices()));
span.tag(Constants.ES_TYPE, StringUtil.join(',', searchRequest.types()));
if (TRACE_DSL) {
Tags.DB_STATEMENT.set(span, searchRequest.toString());
}
}
private void parseGetRequest(GetRequest getRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, getRequest.index());
span.tag(Constants.ES_TYPE, getRequest.type());
if (TRACE_DSL) {
Tags.DB_STATEMENT.set(span, getRequest.toString());
}
}
private void parseIndexRequest(IndexRequest indexRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, indexRequest.index());
span.tag(Constants.ES_TYPE, indexRequest.type());
if (TRACE_DSL) {
Tags.DB_STATEMENT.set(span, indexRequest.toString());
}
}
private void parseUpdateRequest(UpdateRequest updateRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, updateRequest.index());
span.tag(Constants.ES_TYPE, updateRequest.type());
if (TRACE_DSL) {
Tags.DB_STATEMENT.set(span, updateRequest.toString());
}
}
private void parseDeleteRequest(DeleteRequest deleteRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, deleteRequest.index());
span.tag(Constants.ES_TYPE, deleteRequest.type());
if (TRACE_DSL) {
Tags.DB_STATEMENT.set(span, deleteRequest.toString());
}
}
private void parseDeleteIndexRequest(DeleteIndexRequest deleteIndexRequest, AbstractSpan span) {
span.tag(Constants.ES_INDEX, String.join(",", deleteIndexRequest.indices()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.common.transport.TransportAddress;
import java.lang.reflect.Method;
public class TransportClientNodesServiceInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
EnhancedInstance actions = (EnhancedInstance) allArguments[1];
objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField());
}
public static class AddTransportAddressesInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache();
if (transportAddressCache == null) {
transportAddressCache = new TransportAddressCache();
transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache);
}
transportAddressCache.addDiscoveryNode((TransportAddress[]) allArguments[0]);
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
public static class RemoveTransportAddressInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache();
if (transportAddressCache == null) {
transportAddressCache = new TransportAddressCache();
transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache);
}
transportAddressCache.removeDiscoveryNode((TransportAddress) allArguments[0]);
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
public static class ExecuteInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
// tracking AdapterActionFuture.actionGet
if (allArguments.length >= 2 && allArguments[1] instanceof AdapterActionFuture) {
AdapterActionFuture actionFuture = (AdapterActionFuture) allArguments[1];
((EnhancedInstance) actionFuture).setSkyWalkingDynamicField(true);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return null;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
import org.elasticsearch.common.settings.Settings;
public class TransportServiceConInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Settings settings = (Settings) allArguments[0];
String clusterName = settings.get("cluster.name");
TransportClientEnhanceInfo transportClientEnhanceInfo = new TransportClientEnhanceInfo();
transportClientEnhanceInfo.setClusterName(clusterName);
transportClientEnhanceInfo.setTransportAddressCache(new TransportAddressCache());
objInst.setSkyWalkingDynamicField(transportClientEnhanceInfo);
}
}
......@@ -17,3 +17,7 @@
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.RestHighLevelClientInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.IndicesClientInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.ClusterClientInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.AdapterActionFutureInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportServiceInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportActionNodeProxyInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportClientNodesServiceInstrumentation
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHits;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class AdapterActionFutureActionGetMethodsInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
@Mock
private EnhancedInstance enhancedInstance;
@Mock
private SearchResponse searchResponse;
@Mock
private BulkResponse bulkItemResponses;
private SearchHits searchHits;
@Mock
private AdapterActionFutureActionGetMethodsInterceptor interceptor;
@Before
public void setUp() {
searchHits = new SearchHits(null, 309L, 0);
when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(2020));
when(searchResponse.getHits()).thenReturn(searchHits);
when(bulkItemResponses.getTook()).thenReturn(TimeValue.timeValueMillis(2020));
when(bulkItemResponses.getIngestTookInMillis()).thenReturn(1416L);
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(true);
interceptor = new AdapterActionFutureActionGetMethodsInterceptor();
}
@Test
public void testMethodsAround() throws Throwable {
TRACE_DSL = true;
interceptor.beforeMethod(enhancedInstance, null, null, null, null);
interceptor.afterMethod(enhancedInstance, null, null, null, searchResponse);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegmentList.size(), is(1));
TraceSegment traceSegment = traceSegmentList.get(0);
AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0);
assertGetSpan(getSpan, searchResponse);
}
@Test
public void testMethodsAround2() throws Throwable {
TRACE_DSL = true;
interceptor.beforeMethod(enhancedInstance, null, null, null, null);
interceptor.afterMethod(enhancedInstance, null, null, null, bulkItemResponses);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegmentList.size(), is(1));
TraceSegment traceSegment = traceSegmentList.get(0);
AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0);
assertGetSpan(getSpan, bulkItemResponses);
}
private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) {
assertThat(getSpan instanceof LocalSpan, is(true));
LocalSpan span = (LocalSpan) getSpan;
assertThat(span.getOperationName(), is("Elasticsearch/actionGet"));
assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId()));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.size(), is(4));
if (ret instanceof SearchResponse) {
assertThat(tags.get(0).getValue(), is("Elasticsearch"));
assertThat(tags.get(1).getValue(), is("2020"));
assertThat(tags.get(2).getValue(), is("309"));
} else if (ret instanceof BulkResponse) {
assertThat(tags.get(0).getValue(), is("Elasticsearch"));
assertThat(tags.get(1).getValue(), is("2020"));
assertThat(tags.get(2).getValue(), is("1416"));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.net.InetSocketAddress;
import java.util.List;
import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class TransportActionNodeProxyExecuteMethodsInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
@Mock
private EnhancedInstance enhancedInstance;
@Mock
private DiscoveryNode discoveryNode;
@Mock
private GetRequest getRequest;
@Mock
private IndexRequest indexRequest;
@Mock
private UpdateRequest updateRequest;
@Mock
private DeleteRequest deleteRequest;
@Mock
private DeleteIndexRequest deleteIndexRequest;
@Mock
private TransportClientEnhanceInfo enhanceInfo;
private TransportActionNodeProxyExecuteMethodsInterceptor interceptor;
@Before
public void setUp() {
InetSocketAddress inetSocketAddress = new InetSocketAddress("122.122.122.122", 9300);
TransportAddress transportAddress = new TransportAddress(inetSocketAddress);
when(discoveryNode.getAddress()).thenReturn(transportAddress);
when(enhanceInfo.transportAddresses()).thenReturn("122.122.122.122:9300");
when(enhanceInfo.getClusterName()).thenReturn("skywalking-es");
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
when(getRequest.index()).thenReturn("endpoint");
when(getRequest.type()).thenReturn("getType");
when(indexRequest.index()).thenReturn("endpoint");
when(indexRequest.type()).thenReturn("indexType");
when(updateRequest.index()).thenReturn("endpoint");
when(updateRequest.type()).thenReturn("updateType");
when(deleteRequest.index()).thenReturn("endpoint");
when(deleteRequest.type()).thenReturn("deleteType");
when(deleteIndexRequest.indices()).thenReturn(new String[]{"endpoint"});
interceptor = new TransportActionNodeProxyExecuteMethodsInterceptor();
}
@Test
public void testConstruct() {
final EnhancedInstance objInst1 = new EnhancedInstance() {
private Object object = null;
@Override
public Object getSkyWalkingDynamicField() {
return object;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.object = value;
}
};
final EnhancedInstance objInst2 = new EnhancedInstance() {
private Object object = null;
@Override
public Object getSkyWalkingDynamicField() {
return object;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.object = value;
}
};
objInst1.setSkyWalkingDynamicField(123);
Object[] allArguments = new Object[]{null, null, objInst1};
interceptor.onConstruct(objInst2, allArguments);
assertThat(objInst1.getSkyWalkingDynamicField(), is(objInst2.getSkyWalkingDynamicField()));
}
@Test
public void testGetRequest() throws Throwable {
AbstractTracingSpan getSpan = getSpan(getRequest);
assertGetSpan(getSpan, getRequest);
}
@Test
public void testIndexRequest() throws Throwable {
AbstractTracingSpan getSpan = getSpan(indexRequest);
assertGetSpan(getSpan, indexRequest);
}
@Test
public void testUpdateRequest() throws Throwable {
AbstractTracingSpan getSpan = getSpan(updateRequest);
assertGetSpan(getSpan, updateRequest);
}
@Test
public void testDeleteRequest() throws Throwable {
AbstractTracingSpan getSpan = getSpan(deleteRequest);
assertGetSpan(getSpan, deleteRequest);
}
@Test
public void testDeleteIndexRequest() throws Throwable {
AbstractTracingSpan getSpan = getSpan(deleteIndexRequest);
assertGetSpan(getSpan, deleteIndexRequest);
}
private AbstractTracingSpan getSpan(ActionRequest actionRequest) throws Throwable {
TRACE_DSL = true;
Object[] allArguments = new Object[]{discoveryNode, actionRequest};
interceptor.beforeMethod(enhancedInstance, null, allArguments, null, null);
interceptor.afterMethod(enhancedInstance, null, allArguments, null, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegmentList.size(), is(1));
TraceSegment traceSegment = traceSegmentList.get(0);
return SegmentHelper.getSpans(traceSegment).get(0);
}
private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) {
assertThat(getSpan instanceof ExitSpan, is(true));
ExitSpan span = (ExitSpan) getSpan;
assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId()));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Elasticsearch"));
assertThat(tags.get(1).getValue(), is("skywalking-es"));
assertThat(tags.get(2).getValue(), is("122.122.122.122:9300"));
if (ret instanceof SearchRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/SearchRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
assertThat(tags.get(4).getValue(), is("searchType"));
} else if (ret instanceof GetRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/GetRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
assertThat(tags.get(4).getValue(), is("getType"));
} else if (ret instanceof IndexRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/IndexRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
assertThat(tags.get(4).getValue(), is("indexType"));
} else if (ret instanceof UpdateRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/UpdateRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
assertThat(tags.get(4).getValue(), is("updateType"));
} else if (ret instanceof DeleteRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
assertThat(tags.get(4).getValue(), is("deleteType"));
} else if (ret instanceof DeleteIndexRequest) {
assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteIndexRequest"));
assertThat(tags.get(3).getValue(), is("endpoint"));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
public class TransportAddressCacheTest {
private TransportAddressCache transportAddressCache;
@Before
public void setUp() {
transportAddressCache = new TransportAddressCache();
}
@Test
public void transportAddressTest()
throws UnknownHostException {
transportAddressCache.addDiscoveryNode(
new TransportAddress(InetAddress.getByName("172.1.1.1"), 9300),
new TransportAddress(InetAddress.getByName("172.1.1.2"), 9200),
new TransportAddress(InetAddress.getByName("172.1.1.3"), 9100)
);
assertThat(transportAddressCache.transportAddress(), is("172.1.1.1:9300,172.1.1.2:9200,172.1.1.3:9100"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
import org.elasticsearch.common.settings.Settings;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
@PrepareForTest(value = {
Settings.class
})
public class TransportServiceConInterceptorTest {
@Mock
private Settings settings;
private Object[] allArguments;
private TransportServiceConInterceptor transportServiceConInterceptor;
@Before
public void setUp() {
when(settings.get("cluster.name")).thenReturn("my.es.cluster");
allArguments = new Object[]{settings};
}
@Test
public void testConstruct() {
final EnhancedInstance objInst = new EnhancedInstance() {
private Object object = null;
@Override
public Object getSkyWalkingDynamicField() {
return object;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.object = value;
}
};
transportServiceConInterceptor = new TransportServiceConInterceptor();
transportServiceConInterceptor.onConstruct(objInst, allArguments);
assertThat(objInst.getSkyWalkingDynamicField() instanceof TransportClientEnhanceInfo, is(true));
assertThat(((TransportClientEnhanceInfo) (objInst.getSkyWalkingDynamicField())).getClusterName(), is("my.es.cluster"));
}
}
......@@ -59,7 +59,8 @@
* [Spymemcached](https://github.com/couchbase/spymemcached) 2.x
* [Xmemcached](https://github.com/killme2008/xmemcached) 2.x
* [Elasticsearch](https://github.com/elastic/elasticsearch)
* [transport-client](https://github.com/elastic/elasticsearch/tree/master/client/transport) 5.2.x-5.6.x
* [transport-client](https://github.com/elastic/elasticsearch/tree/v5.2.0/client/transport) 5.2.x-5.6.x
* [transport-client](https://github.com/elastic/elasticsearch/tree/v6.7.1/client/transport) 6.7.1-6.8.4
* [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/index.html) 6.7.1-6.8.4
* [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.0/java-rest-high.html) 7.0.0-7.5.2
* [Solr](https://github.com/apache/lucene-solr/)
......
......@@ -128,6 +128,140 @@ segmentItems:
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- operationName: Elasticsearch/IndexRequest
operationId: 0
parentSpanId: 0
spanId: 7
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- {key: es.types, value: not null}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/actionGet
operationId: 0
parentSpanId: 0
spanId: 8
spanLayer: null
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Local
peer: null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/GetRequest
operationId: 0
parentSpanId: 0
spanId: 9
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- {key: es.types, value: not null}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/SearchRequest
operationId: 0
parentSpanId: 0
spanId: 10
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- {key: es.types, value: not null}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/UpdateRequest
operationId: 0
parentSpanId: 0
spanId: 11
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- {key: es.types, value: not null}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/DeleteRequest
operationId: 0
parentSpanId: 0
spanId: 12
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- {key: es.types, value: not null}
- {key: db.statement, value: not null}
- operationName: Elasticsearch/DeleteIndexRequest
operationId: 0
parentSpanId: 0
spanId: 13
spanLayer: Database
startTime: nq 0
endTime: nq 0
componentId: 48
componentName: ''
isError: false
spanType: Exit
peer: not null
peerId: 0
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
- {key: node.address, value: not null}
- {key: es.indices, value: not null}
- operationName: /elasticsearch-case/case/elasticsearch
operationId: 0
parentSpanId: -1
......@@ -143,4 +277,4 @@ segmentItems:
peerId: 0
tags:
- {key: url, value: 'http://localhost:8080/elasticsearch-case/case/elasticsearch'}
- {key: http.method, value: GET}
- {key: http.method, value: GET}
\ No newline at end of file
......@@ -31,7 +31,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<test.framework.version>6.7.1</test.framework.version>
<test.framework.version>6.8.6</test.framework.version>
<docker.image.version>${test.framework.version}</docker.image.version>
<spring-boot.version>2.1.4.RELEASE</spring-boot.version>
......@@ -56,6 +56,12 @@
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- elasticsearch transport-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${test.framework.version}</version>
</dependency>
<!-- elasticsearch rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.testcase.elasticsearch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import static java.util.Collections.singletonMap;
@Component
public class RestHighLevelClientCase {
private static Logger logger = LogManager.getLogger(RestHighLevelClientCase.class);
@Autowired
private RestHighLevelClient client;
public boolean healthcheck() throws Exception {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout(TimeValue.timeValueSeconds(10));
request.waitForStatus(ClusterHealthStatus.GREEN);
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.isTimedOut()) {
String message = "elastic search node start fail!";
logger.error(message);
throw new RuntimeException(message);
}
return true;
}
public boolean elasticsearch() throws Exception {
String indexName = UUID.randomUUID().toString();
try {
//create
createIndex(client, indexName);
// index
index(client, indexName);
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
//get
get(client, indexName);
// search
search(client, indexName);
// update
update(client, indexName);
// delete
delete(client, indexName);
} finally {
if (null != client) {
client.close();
}
}
return true;
}
private void createIndex(RestHighLevelClient client, String indexName) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("properties");
{
builder.startObject("author");
{
builder.field("type", "keyword");
}
builder.endObject();
builder.startObject("title");
{
builder.field("type", "keyword");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
request.mapping(builder);
request.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0));
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
if (createIndexResponse.isAcknowledged() == false) {
String message = "elasticsearch create index fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void index(RestHighLevelClient client, String indexName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("author", "Marker");
builder.field("title", "Java programing.");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest(indexName, "_doc", "1").source(builder);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
if (indexResponse.status().getStatus() >= 400) {
String message = "elasticsearch index data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void get(RestHighLevelClient client, String indexName) throws IOException {
GetRequest getRequest = new GetRequest(indexName, "_doc", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
if (!getResponse.isExists()) {
String message = "elasticsearch get data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void update(RestHighLevelClient client, String indexName) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, "_doc", "1");
Map<String, Object> parameters = singletonMap("title", "c++ programing.");
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.title = params.title", parameters);
request.script(inline);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
if (updateResponse.getVersion() != 2) {
String message = "elasticsearch update data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void delete(RestHighLevelClient client, String indexName) throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
if (!deleteIndexResponse.isAcknowledged()) {
String message = "elasticsearch delete index fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void search(RestHighLevelClient client, String indexName) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("author", "Marker"));
sourceBuilder.from(0);
sourceBuilder.size(10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (!(searchResponse.getHits().totalHits > 0)) {
String message = "elasticsearch search data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.testcase.elasticsearch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.UUID;
@Component
public class TransportClientCase {
private static Logger logger = LogManager.getLogger(TransportClientCase.class);
@Autowired
private TransportClient client;
public boolean elasticsearch() throws Exception {
String indexName = UUID.randomUUID().toString();
try {
// create
index(client, indexName);
// get
get(client, indexName);
// search
search(client, indexName);
// update
update(client, indexName);
// delete
delete(client, indexName);
// remove index
client.admin().indices().prepareDelete(indexName).execute();
} finally {
if (null != client) {
client.close();
}
}
return true;
}
private void index(Client client, String indexName) throws IOException {
try {
client.prepareIndex(indexName, "test", "1")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name", "mysql innodb")
.field("price", "0")
.field("language", "chinese")
.endObject())
.get();
} catch (IOException e) {
logger.error("index document error.", e);
throw e;
}
}
private void get(Client client, String indexName) {
client.prepareGet().setIndex(indexName).setId("1").execute();
}
private void update(Client client, String indexName) throws IOException {
try {
client.prepareUpdate(indexName, "test", "1")
.setDoc(XContentFactory.jsonBuilder().startObject().field("price", "9.9").endObject())
.execute();
} catch (IOException e) {
logger.error("update document error.", e);
throw e;
}
}
private void delete(Client client, String indexName) {
client.prepareDelete(indexName, "test", "1").execute();
}
private void search(Client client, String indexName) {
client.prepareSearch(indexName).setTypes("test").setSize(10).execute();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.testcase.elasticsearch.config;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
@Configuration
public class TransportClientConfig {
@Value("${elasticsearch.server}")
private String elasticsearchHost;
public final static Integer PORT = 9300; //端口
@Bean
public TransportClient getESClientConnection()
throws Exception {
TransportClient client = null;
Settings settings = Settings.builder()
.put("cluster.name", "docker-node")
.put("client.transport.sniff", false)
.build();
client = new PreBuiltTransportClient(settings);
for (TransportAddress i : parseEsHost()) {
client.addTransportAddress(i);
}
return client;
}
private TransportAddress[] parseEsHost()
throws Exception {
TransportAddress[] transportAddresses = null;
if (!elasticsearchHost.isEmpty()) {
String[] hostIp = elasticsearchHost.split(",");
transportAddresses = new TransportAddress[hostIp.length];
for (int i = 0; i < hostIp.length; ++i) {
String[] hostIpItem = hostIp[i].split(":");
String ip = hostIpItem[0].trim();
String port = hostIpItem[1].trim();
transportAddresses[i] = new TransportAddress(InetAddress.getByName(ip), PORT);
}
}
return transportAddresses;
}
}
......@@ -18,44 +18,15 @@
package org.apache.skywalking.apm.testcase.elasticsearch.controller;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.apache.skywalking.apm.testcase.elasticsearch.RestHighLevelClientCase;
import org.apache.skywalking.apm.testcase.elasticsearch.TransportClientCase;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static java.util.Collections.singletonMap;
@RestController
@RequestMapping("/elasticsearch-case/case")
public class CaseController {
......@@ -63,153 +34,24 @@ public class CaseController {
private static Logger logger = LogManager.getLogger(CaseController.class);
@Autowired
private RestHighLevelClient client;
private RestHighLevelClientCase restHighLevelClientCase;
@Autowired
private TransportClientCase transportClientCase;
@GetMapping("/healthcheck")
public String healthcheck() throws Exception {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout(TimeValue.timeValueSeconds(10));
request.waitForStatus(ClusterHealthStatus.GREEN);
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.isTimedOut()) {
String message = "elastic search node start fail!";
logger.error(message);
throw new RuntimeException(message);
}
restHighLevelClientCase.healthcheck();
return "Success";
}
@GetMapping("/elasticsearch")
public String elasticsearch() throws Exception {
String indexName = UUID.randomUUID().toString();
try {
//create
createIndex(client, indexName);
// index
index(client, indexName);
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
restHighLevelClientCase.elasticsearch();
transportClientCase.elasticsearch();
//get
get(client, indexName);
// search
search(client, indexName);
// update
update(client, indexName);
// delete
delete(client, indexName);
} finally {
if (null != client) {
client.close();
}
}
return "Success";
}
private void createIndex(RestHighLevelClient client, String indexName) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("properties");
{
builder.startObject("author");
{
builder.field("type", "keyword");
}
builder.endObject();
builder.startObject("title");
{
builder.field("type", "keyword");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
request.mapping(builder);
request.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0));
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
if (createIndexResponse.isAcknowledged() == false) {
String message = "elasticsearch create index fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void index(RestHighLevelClient client, String indexName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("author", "Marker");
builder.field("title", "Java programing.");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest(indexName, "_doc", "1").source(builder);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
if (indexResponse.status().getStatus() >= 400) {
String message = "elasticsearch index data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void get(RestHighLevelClient client, String indexName) throws IOException {
GetRequest getRequest = new GetRequest(indexName, "_doc", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
if (!getResponse.isExists()) {
String message = "elasticsearch get data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void update(RestHighLevelClient client, String indexName) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, "_doc", "1");
Map<String, Object> parameters = singletonMap("title", "c++ programing.");
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.title = params.title", parameters);
request.script(inline);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
if (updateResponse.getVersion() != 2) {
String message = "elasticsearch update data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void delete(RestHighLevelClient client, String indexName) throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
if (!deleteIndexResponse.isAcknowledged()) {
String message = "elasticsearch delete index fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
private void search(RestHighLevelClient client, String indexName) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("author", "Marker"));
sourceBuilder.from(0);
sourceBuilder.size(10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (!(searchResponse.getHits().totalHits > 0)) {
String message = "elasticsearch search data fail.";
logger.error(message);
throw new RuntimeException(message);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册