提交 753cb284 编写于 作者: P pengys5

temporary storage

上级 646482de
......@@ -36,7 +36,7 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
logger.error(e);
logger.error(e, e);
}
});
getRecordAnalysisData().asMap().clear();
......
package org.skywalking.apm.collector.worker.globaltrace.analysis;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.JoinAndSplitAnalysisMember;
......@@ -10,12 +15,9 @@ import org.skywalking.apm.collector.worker.config.WorkerConfig;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.GlobalTraceId;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -30,15 +32,13 @@ public class GlobalTraceAnalysis extends JoinAndSplitAnalysisMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
String subSegmentId = segment.getTraceSegmentId();
List<GlobalTraceId> globalTraceIdList = null;
// List<GlobalTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
List<String> globalTraceIdList = segment.getRelatedGlobalTraces().get();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (GlobalTraceId disTraceId : globalTraceIdList) {
// String traceId = disTraceId.get();
// set(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
for (String globalTraceId : globalTraceIdList) {
set(globalTraceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
}
} else {
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.actor.*;
import java.io.IOException;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
/**
* @author pengys5
*/
public abstract class AbstractGet extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(AbstractGet.class);
protected AbstractGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>) request;
@Override final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>)request;
try {
onSearch(parameterMap, (JsonObject) response);
onSearch(parameterMap, (JsonObject)response);
} catch (Exception e) {
((JsonObject) response).addProperty("isSuccess", false);
((JsonObject) response).addProperty("reason", e.getMessage());
logger.error(e, e);
((JsonObject)response).addProperty("isSuccess", false);
((JsonObject)response).addProperty("reason", e.getMessage());
}
}
......@@ -39,9 +46,8 @@ public abstract class AbstractGet extends AbstractLocalSyncWorker {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
@Override final protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
Map<String, String[]> parameterMap = request.getParameterMap();
JsonObject resJson = new JsonObject();
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
......@@ -16,6 +15,7 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentAndJson;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
/**
* @author pengys5
......@@ -37,8 +37,6 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);
private final Gson gson = new Gson();
private final LocalAsyncWorkerRef ownerWorkerRef;
PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
......@@ -53,7 +51,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
streamReader(bufferedReader);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
logger.error(e);
logger.error(e, e);
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
......@@ -83,7 +81,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
builder.append(buffer);
String segmentJsonStr = builder.toString();
segment = gson.fromJson(segmentJsonStr, Segment.class);
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
ownerWorkerRef.tell(new SegmentAndJson(segment, segmentJsonStr));
}
......
......@@ -29,7 +29,7 @@ public class SegmentAnalysis extends RecordAnalysisMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Segment) {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson) message;
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndJson);
} else {
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.google.gson.JsonArray;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
......@@ -27,18 +27,10 @@ public enum SegmentDeserialize {
}
private void streamReader(List<Segment> segmentList, FileReader fileReader) throws Exception {
try (JsonReader reader = new JsonReader(fileReader)) {
readSegmentArray(segmentList, reader);
}
}
private void readSegmentArray(List<Segment> segmentList, JsonReader reader) throws Exception {
reader.beginArray();
while (reader.hasNext()) {
Segment segment = new Segment();
// segment.deserialize(reader);
JsonArray segmentArray = gson.fromJson(fileReader, JsonArray.class);
for (int i = 0; i < segmentArray.size(); i++) {
Segment segment = gson.fromJson(segmentArray.get(i), Segment.class);
segmentList.add(segment);
}
reader.endArray();
}
}
......@@ -3,24 +3,27 @@ package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.segment.SegmentCostIndex;
import org.skywalking.apm.collector.worker.segment.SegmentExceptionIndex;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.GlobalTraceId;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitData;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -29,7 +32,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
private SegmentTopSearchWithGlobalTraceId(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -41,7 +44,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
Client client = EsClient.INSTANCE.getClient();
String globalTraceData = client.prepareGet(GlobalTraceIndex.INDEX, GlobalTraceIndex.TYPE_RECORD, search.globalTraceId).get().getSourceAsString();
......@@ -70,25 +73,24 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
JsonObject topSegmentJson = new JsonObject();
topSegmentJson.addProperty("num", num);
String segId = (String) getResponse.getSource().get(SegmentCostIndex.SEG_ID);
String segId = (String)getResponse.getSource().get(SegmentCostIndex.SEG_ID);
topSegmentJson.addProperty(SegmentCostIndex.SEG_ID, segId);
topSegmentJson.addProperty(SegmentCostIndex.START_TIME, (Number) getResponse.getSource().get(SegmentCostIndex.START_TIME));
topSegmentJson.addProperty(SegmentCostIndex.START_TIME, (Number)getResponse.getSource().get(SegmentCostIndex.START_TIME));
if (getResponse.getSource().containsKey(SegmentCostIndex.END_TIME)) {
topSegmentJson.addProperty(SegmentCostIndex.END_TIME, (Number) getResponse.getSource().get(SegmentCostIndex.END_TIME));
topSegmentJson.addProperty(SegmentCostIndex.END_TIME, (Number)getResponse.getSource().get(SegmentCostIndex.END_TIME));
}
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String) getResponse.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number) getResponse.getSource().get(SegmentCostIndex.COST));
topSegmentJson.addProperty(SegmentCostIndex.OPERATION_NAME, (String)getResponse.getSource().get(SegmentCostIndex.OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)getResponse.getSource().get(SegmentCostIndex.COST));
String segmentSource = client.prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
List<GlobalTraceId> distributedTraceIdList = null;
// List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
// distributedTraceIdArray.add(distributedTraceId.get());
for (String distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId);
}
}
topSegmentJson.add("traceIds", distributedTraceIdArray);
......@@ -109,7 +111,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
}
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", topSegPaging);
}
}
......
......@@ -90,13 +90,12 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
logger().debug("segmentSource:" + segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
// List<GlobalTraceId> distributedTraceIdList = segment.getRelatedGlobalTraces();
List<GlobalTraceId> distributedTraceIdList = null;
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
JsonArray distributedTraceIdArray = new JsonArray();
if (CollectionTools.isNotEmpty(distributedTraceIdList)) {
for (GlobalTraceId distributedTraceId : distributedTraceIdList) {
// distributedTraceIdArray.add(distributedTraceId.get());
for (String distributedTraceId : distributedTraceIdList) {
distributedTraceIdArray.add(distributedTraceId);
}
}
topSegmentJson.add("traceIds", distributedTraceIdArray);
......
......@@ -28,7 +28,7 @@ public enum PersistenceTimer {
extractDataAndSave();
Thread.sleep(timeInterval);
} catch (Throwable e) {
logger.error(e);
logger.error(e, e);
}
}
};
......
......@@ -19,6 +19,9 @@
<logger name="org.skywalking.apm.collector" level="debug">
<AppenderRef ref="RollingFile"/>
</logger>
<logger name="org.skywalking.apm.collector.worker.storage" level="error">
<AppenderRef ref="RollingFile"/>
</logger>
<Root level="INFO">
<AppenderRef ref="RollingFile"/>
</Root>
......
......@@ -41,12 +41,12 @@ public enum HttpClientTools {
}
}
} catch (Exception e) {
logger.error(e);
logger.error(e, e);
} finally {
try {
httpClient.close();
} catch (IOException e) {
logger.error(e);
logger.error(e, e);
}
}
return null;
......@@ -65,12 +65,12 @@ public enum HttpClientTools {
}
}
} catch (Exception e) {
logger.error(e);
logger.error(e, e);
} finally {
try {
httpClient.close();
} catch (Exception e) {
logger.error(e);
logger.error(e, e);
}
}
return null;
......
......@@ -361,12 +361,22 @@
},
"tb": {},
"ti": {},
"lo": []
"lo": [
{
"tm": 1490923010329,
"fi": {
"stack": "com.weibo.api.motan.exception.MotanBizException: error_message: provider call process error, status: 503, error_code: 30001,r\u003d0\n\tat com.weibo.api.motan.rpc.DefaultProvider.invoke(DefaultProvider.java:62)\n\tat com.weibo.api.motan.rpc.AbstractProvider.call(AbstractProvider.java:47)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.process(OpenTracingFilter.java:94)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.processProviderTrace(OpenTracingFilter.java:148)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.filter(OpenTracingFilter.java:58)\n\tat com.weibo.api.motan.protocol.support.ProtocolFilterDecorator$2.call(ProtocolFilterDecorator.java:150)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call$original$doBagGo8(ProviderMessageRouter.java:96)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call$original$doBagGo8$accessor$PEWyOMYz(ProviderMessageRouter.java)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter$auxiliary$s7ixnKeK.call(Unknown Source)\n\tat org.skywalking.apm.api.plugin.interceptor.enhance.ClassInstanceMethodsInterceptor.intercept(ClassInstanceMethodsInterceptor.java:66)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call(ProviderMessageRouter.java)\n\tat com.weibo.api.motan.transport.ProviderProtectedMessageRouter.call(ProviderProtectedMessageRouter.java:79)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.handle(ProviderMessageRouter.java:91)\n\tat com.weibo.api.motan.transport.support.DefaultRpcHeartbeatFactory$HeartMessageHandleWrapper.handle(DefaultRpcHeartbeatFactory.java:82)\n\tat com.weibo.api.motan.transport.netty.NettyChannelHandler.processRequest(NettyChannelHandler.java:139)\n\tat com.weibo.api.motan.transport.netty.NettyChannelHandler.access$000(NettyChannelHandler.java:47)\n\tat com.weibo.api.motan.transport.netty.NettyChannelHandler$1.run(NettyChannelHandler.java:116)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.skywalking.apm.test.cache.jedis.JedisServiceManager.findWithException(JedisServiceManager.java:49)\n\tat org.skywalking.apm.test.cache.CacheServiceImpl.findCacheWithException(CacheServiceImpl.java:46)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat com.weibo.api.motan.rpc.DefaultProvider.invoke(DefaultProvider.java:57)\n\t... 19 more\ncom.weibo.api.motan.exception.MotanBizException: error_message: provider call process error, status: 503, error_code: 30001,r\u003d0\n\tat com.weibo.api.motan.rpc.DefaultProvider.invoke(DefaultProvider.java:62)\n\tat com.weibo.api.motan.rpc.AbstractProvider.call(AbstractProvider.java:47)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.process(OpenTracingFilter.java:94)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.processProviderTrace(OpenTracingFilter.java:148)\n\tat com.weibo.api.motan.filter.opentracing.OpenTracingFilter.filter(OpenTracingFilter.java:58)\n\tat com.weibo.api.motan.protocol.support.ProtocolFilterDecorator$2.call(ProtocolFilterDecorator.java:150)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call$original$doBagGo8(ProviderMessageRouter.java:96)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call$original$doBagGo8$accessor$PEWyOMYz(ProviderMessageRouter.java)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter$auxiliary$s7ixnKeK.call(Unknown Source)\n\tat org.skywalking.apm.api.plugin.interceptor.enhance.ClassInstanceMethodsInterceptor.intercept(ClassInstanceMethodsInterceptor.java:66)\n\tat com.weibo.api.motan.transport.ProviderMessageRouter.call(ProviderMessageRouter.java)\n\tat com.weibo.api.motan.",
"error.kind": "com.weibo.api.motan.exception.MotanBizException",
"event": "error",
"message": "error_message: provider call process error, status: 503, error_code: 30001,r\u003d0"
}
}
]
}
],
"ac": "cache-service",
"gt": [
"Trace.1490922929254.1797892356.6003.69.2,Trace.1490922929254.1797892356.6003.69.3"
"Trace.1490922929254.1797892356.6003.69.2"
]
}
]
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册