提交 c387cfb9 编写于 作者: P pengys5

Trace stack UI: top segment list ok.

上级 527f8a55
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import java.io.IOException;
import com.google.gson.JsonElement;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.agentstream.grpc.AgentStreamGRPCDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
......@@ -22,15 +21,13 @@ public class AgentStreamGRPCServerHandler extends JettyHandler {
return "/agentstream/grpc";
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamGRPCDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
reply(resp, serverArray, HttpServletResponse.SC_OK);
return serverArray;
}
}
package org.skywalking.apm.collector.agentserver.jetty.handler;
import com.google.gson.JsonArray;
import java.io.IOException;
import com.google.gson.JsonElement;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.agentstream.jetty.AgentStreamJettyDataListener;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
......@@ -22,15 +21,13 @@ public class AgentStreamJettyServerHandler extends JettyHandler {
return "/agentstream/jetty";
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(AgentStreamJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
reply(resp, serverArray, HttpServletResponse.SC_OK);
return serverArray;
}
}
......@@ -3,8 +3,12 @@ package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......@@ -18,12 +22,19 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostSpanListener implements EntrySpanListener, GlobalTraceIdsListener {
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener, GlobalTraceIdsListener {
private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);
private List<String> globalTraceIds = new ArrayList<>();
private List<SegmentCostDataDefine.SegmentCost> segmentCosts = new ArrayList<>();
private boolean isError = false;
private long timeBucket;
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
......@@ -34,6 +45,18 @@ public class SegmentCostSpanListener implements EntrySpanListener, GlobalTraceId
segmentCost.setSegmentId(segmentId);
segmentCost.setOperationName(spanObject.getOperationName());
segmentCosts.add(segmentCost);
isError = isError || spanObject.getIsError();
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
}
@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
}
@Override public void parseGlobalTraceId(UniqueId uniqueId) {
......@@ -50,6 +73,8 @@ public class SegmentCostSpanListener implements EntrySpanListener, GlobalTraceId
for (String globalTraceId : globalTraceIds) {
segmentCost.setGlobalTraceId(globalTraceId);
segmentCost.setId(segmentCost.getSegmentId() + globalTraceId);
segmentCost.setError(isError);
segmentCost.setTimeBucket(timeBucket);
try {
logger.debug("send to segment cost persistence worker, id: {}", segmentCost.getId());
context.getClusterWorkerContext().lookup(SegmentCostPersistenceWorker.WorkerRole.INSTANCE).tell(segmentCost.transform());
......
......@@ -29,6 +29,8 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
logger.debug("segment cost source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentCostTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
......
......@@ -14,14 +14,12 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class SegmentCostDataDefine extends DataDefine {
public static final int DEFINE_ID = 402;
@Override public int defineId() {
return DEFINE_ID;
return 402;
}
@Override protected int initialCapacity() {
return 7;
return 9;
}
@Override protected void attributeDefine() {
......@@ -32,6 +30,8 @@ public class SegmentCostDataDefine extends DataDefine {
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(SegmentCostTable.COLUMN_IS_ERROR, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(8, new Attribute(SegmentCostTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......@@ -42,7 +42,9 @@ public class SegmentCostDataDefine extends DataDefine {
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, globalTraceId, operationName, cost, startTime, endTime);
Boolean isError = remoteData.getDataBooleans(0);
Long timeBucket = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, globalTraceId, operationName, cost, startTime, endTime, isError, timeBucket);
}
@Override public RemoteData serialize(Object object) {
......@@ -55,6 +57,7 @@ public class SegmentCostDataDefine extends DataDefine {
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
builder.addDataBooleans(segmentCost.isError());
return builder.build();
}
......@@ -66,9 +69,11 @@ public class SegmentCostDataDefine extends DataDefine {
private Long cost;
private Long startTime;
private Long endTime;
private boolean isError;
private long timeBucket;
public SegmentCost(String id, String segmentId, String globalTraceId, String operationName, Long cost,
Long startTime, Long endTime) {
SegmentCost(String id, String segmentId, String globalTraceId, String operationName, Long cost,
Long startTime, Long endTime, boolean isError, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
......@@ -76,6 +81,8 @@ public class SegmentCostDataDefine extends DataDefine {
this.cost = cost;
this.startTime = startTime;
this.endTime = endTime;
this.isError = isError;
this.timeBucket = timeBucket;
}
public SegmentCost() {
......@@ -91,6 +98,8 @@ public class SegmentCostDataDefine extends DataDefine {
data.setDataLong(0, this.cost);
data.setDataLong(1, this.startTime);
data.setDataLong(2, this.endTime);
data.setDataBoolean(0, this.isError);
data.setDataLong(3, this.timeBucket);
return data;
}
......@@ -149,5 +158,21 @@ public class SegmentCostDataDefine extends DataDefine {
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public boolean isError() {
return isError;
}
public void setError(boolean error) {
isError = error;
}
public long getTimeBucket() {
return timeBucket;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
}
}
......@@ -31,5 +31,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_END_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_IS_ERROR, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
......@@ -20,5 +20,7 @@ public class SegmentCostH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_COST, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_START_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_END_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_IS_ERROR, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(SegmentCostTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -13,4 +13,5 @@ public class SegmentCostTable extends CommonTable {
public static final String COLUMN_GLOBAL_TRACE_ID = "global_trace_id";
public static final String COLUMN_OPERATION_NAME = "operation_name";
public static final String COLUMN_COST = "cost";
public static final String COLUMN_IS_ERROR = "is_error";
}
......@@ -9,29 +9,29 @@ import java.util.TimeZone;
*/
public enum TimeBucketUtils {
INSTANCE;
private final SimpleDateFormat DAY_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd");
private final SimpleDateFormat HOUR_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHH");
private final SimpleDateFormat MINUTE_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm");
private final SimpleDateFormat dayDateFormat = new SimpleDateFormat("yyyyMMdd");
private final SimpleDateFormat hourDateFormat = new SimpleDateFormat("yyyyMMddHH");
private final SimpleDateFormat minuteDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
public long getMinuteTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = MINUTE_DATE_FORMAT.format(calendar.getTime());
String timeStr = minuteDateFormat.format(calendar.getTime());
return Long.valueOf(timeStr);
}
public long getHourTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = HOUR_DATE_FORMAT.format(calendar.getTime()) + "00";
String timeStr = hourDateFormat.format(calendar.getTime()) + "00";
return Long.valueOf(timeStr);
}
public long getDayTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = DAY_DATE_FORMAT.format(calendar.getTime()) + "0000";
String timeStr = dayDateFormat.format(calendar.getTime()) + "0000";
return Long.valueOf(timeStr);
}
......
......@@ -7,6 +7,7 @@
</appender>
<logger name="org.apache.zookeeper.ClientCnxn" level="INFO"/>
<logger name="org.eclipse.jetty" level="INFO"/>
<root level="debug">
<appender-ref ref="STDOUT"/>
......
package org.skywalking.apm.collector.server.jetty;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public class ArgumentsParseException extends CollectorException {
public ArgumentsParseException(String message) {
super(message);
}
public ArgumentsParseException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -3,7 +3,14 @@ package org.skywalking.apm.collector.server.jetty;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.core.framework.Handler;
......@@ -14,14 +21,127 @@ public abstract class JettyHandler extends HttpServlet implements Handler {
public abstract String pathSpec();
protected final void reply(HttpServletResponse response, JsonElement resJson, int status) throws IOException {
@Override
protected final void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
reply(resp, doGet(req));
} catch (ArgumentsParseException e) {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
}
}
protected abstract JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException;
@Override
protected final void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPost(req, resp);
}
@Override
protected final void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doHead(req, resp);
}
@Override protected final long getLastModified(HttpServletRequest req) {
return super.getLastModified(req);
}
@Override
protected final void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPut(req, resp);
}
@Override
protected final void doDelete(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doDelete(req, resp);
}
@Override
protected final void doOptions(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doOptions(req, resp);
}
@Override
protected final void doTrace(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doTrace(req, resp);
}
@Override
protected final void service(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.service(req, resp);
}
@Override public final void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
super.service(req, res);
}
@Override public final void destroy() {
super.destroy();
}
@Override public final String getInitParameter(String name) {
return super.getInitParameter(name);
}
@Override public final Enumeration<String> getInitParameterNames() {
return super.getInitParameterNames();
}
@Override public final ServletConfig getServletConfig() {
return super.getServletConfig();
}
@Override public final ServletContext getServletContext() {
return super.getServletContext();
}
@Override public final String getServletInfo() {
return super.getServletInfo();
}
@Override public final void init(ServletConfig config) throws ServletException {
super.init(config);
}
@Override public final void init() throws ServletException {
super.init();
}
@Override public final void log(String msg) {
super.log(msg);
}
@Override public final void log(String message, Throwable t) {
super.log(message, t);
}
@Override public final String getServletName() {
return super.getServletName();
}
private void reply(HttpServletResponse response, JsonElement resJson) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
out.print(resJson);
out.flush();
out.close();
}
private void replyError(HttpServletResponse response, String errorMessage, int status) throws IOException {
response.setContentType("text/plain");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
PrintWriter out = response.getWriter();
out.print(errorMessage);
out.flush();
out.close();
}
}
......@@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -18,9 +17,8 @@ public abstract class WorkerContext implements Context {
private Map<String, RemoteWorkerRef> remoteWorkerRefs;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
private Map<Integer, DataDefine> dataDefineMap;
public WorkerContext() {
WorkerContext() {
this.roleWorkers = new HashMap<>();
this.roles = new HashMap<>();
this.remoteWorkerRefs = new HashMap<>();
......@@ -32,8 +30,7 @@ public abstract class WorkerContext implements Context {
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
return refs;
return new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
} else {
throw new WorkerNotFoundException("role=" + role.roleName() + ", no available worker.");
}
......@@ -55,10 +52,6 @@ public abstract class WorkerContext implements Context {
return roles.get(roleName);
}
public final DataDefine getDataDefine(int defineId) {
return dataDefineMap.get(defineId);
}
@Override final public void put(WorkerRef workerRef) {
logger.debug("put worker reference into context, role name: {}", workerRef.getRole().roleName());
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
......
......@@ -70,7 +70,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getClusterContext().getDataDefine(data.getDefineId()).mergeData(data, dataCache.get(data.id()));
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
if (dataCache.currentCollectionSize() < 1000) {
dataCache.put(data.id(), data);
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
* @author pengys5
*/
public enum AttributeType {
STRING, LONG, FLOAT, INTEGER, BYTE
STRING, LONG, FLOAT, INTEGER, BYTE, BOOLEAN
}
......@@ -13,26 +13,30 @@ public class Data extends AbstractHashMessage {
private final int longCapacity;
private final int floatCapacity;
private final int integerCapacity;
private final int booleanCapacity;
private final int byteCapacity;
private String[] dataStrings;
private Long[] dataLongs;
private Float[] dataFloats;
private Integer[] dataIntegers;
private Boolean[] dataBooleans;
private byte[][] dataBytes;
public Data(String id, int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity,
int byteCapacity) {
int booleanCapacity, int byteCapacity) {
super(id);
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
this.dataIntegers = new Integer[integerCapacity];
this.dataBooleans = new Boolean[booleanCapacity];
this.dataBytes = new byte[byteCapacity][];
this.stringCapacity = stringCapacity;
this.longCapacity = longCapacity;
this.floatCapacity = floatCapacity;
this.integerCapacity = integerCapacity;
this.booleanCapacity = booleanCapacity;
this.byteCapacity = byteCapacity;
}
......@@ -52,6 +56,10 @@ public class Data extends AbstractHashMessage {
dataIntegers[position] = value;
}
public void setDataBoolean(int position, Boolean value) {
dataBooleans[position] = value;
}
public void setDataBytes(int position, byte[] dataBytes) {
this.dataBytes[position] = dataBytes;
}
......@@ -72,6 +80,10 @@ public class Data extends AbstractHashMessage {
return dataIntegers[position];
}
public Boolean getDataBoolean(int position) {
return dataBooleans[position];
}
public byte[] getDataBytes(int position) {
return dataBytes[position];
}
......@@ -91,6 +103,7 @@ public class Data extends AbstractHashMessage {
builder.setStringCapacity(stringCapacity);
builder.setLongCapacity(longCapacity);
builder.setByteCapacity(byteCapacity);
builder.setBooleanCapacity(booleanCapacity);
for (int i = 0; i < dataStrings.length; i++) {
builder.setDataStrings(i, dataStrings[i]);
......@@ -104,6 +117,9 @@ public class Data extends AbstractHashMessage {
for (int i = 0; i < dataLongs.length; i++) {
builder.setDataLongs(i, dataLongs[i]);
}
for (int i = 0; i < dataBooleans.length; i++) {
builder.setDataBooleans(i, dataBooleans[i]);
}
for (int i = 0; i < dataBytes.length; i++) {
builder.setDataBytes(i, ByteString.copyFrom(dataBytes[i]));
}
......
......@@ -11,6 +11,7 @@ public abstract class DataDefine {
private int longCapacity;
private int floatCapacity;
private int integerCapacity;
private int booleanCapacity;
private int byteCapacity;
public DataDefine() {
......@@ -29,6 +30,8 @@ public abstract class DataDefine {
floatCapacity++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
integerCapacity++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
booleanCapacity++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
byteCapacity++;
}
......@@ -46,7 +49,7 @@ public abstract class DataDefine {
protected abstract void attributeDefine();
public final Data build(String id) {
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity, byteCapacity);
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity, booleanCapacity, byteCapacity);
}
public void mergeData(Data newData, Data oldData) {
......@@ -54,6 +57,7 @@ public abstract class DataDefine {
int longPosition = 0;
int floatPosition = 0;
int integerPosition = 0;
int booleanPosition = 0;
int bytePosition = 0;
for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i];
......@@ -66,9 +70,12 @@ public abstract class DataDefine {
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataFloat(floatPosition), oldData.getDataFloat(floatPosition));
floatPosition++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
integerPosition++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataBoolean(booleanPosition), oldData.getDataBoolean(booleanPosition));
integerPosition++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataBytes(bytePosition), oldData.getDataBytes(integerPosition));
bytePosition++;
......
......@@ -12,5 +12,7 @@ public interface Operation {
Integer operate(Integer newValue, Integer oldValue);
Boolean operate(Boolean newValue, Boolean oldValue);
byte[] operate(byte[] newValue, byte[] oldValue);
}
......@@ -23,6 +23,10 @@ public class AddOperation implements Operation {
return newValue + oldValue;
}
@Override public Boolean operate(Boolean newValue, Boolean oldValue) {
throw new UnsupportedOperationException("not support boolean addition operation");
}
@Override public byte[] operate(byte[] newValue, byte[] oldValue) {
throw new UnsupportedOperationException("not support byte addition operation");
}
......
......@@ -22,6 +22,10 @@ public class CoverOperation implements Operation {
return newValue;
}
@Override public Boolean operate(Boolean newValue, Boolean oldValue) {
return newValue;
}
@Override public byte[] operate(byte[] newValue, byte[] oldValue) {
return newValue;
}
......
......@@ -22,6 +22,10 @@ public class NonOperation implements Operation {
return oldValue;
}
@Override public Boolean operate(Boolean newValue, Boolean oldValue) {
return oldValue;
}
@Override public byte[] operate(byte[] newValue, byte[] oldValue) {
return oldValue;
}
......
......@@ -19,11 +19,13 @@ message RemoteData {
int32 floatCapacity = 3;
int32 integerCapacity = 4;
int32 byteCapacity = 5;
repeated string dataStrings = 6;
repeated int64 dataLongs = 7;
repeated float dataFloats = 8;
repeated int32 dataIntegers = 9;
repeated bytes dataBytes = 10;
int32 booleanCapacity = 6;
repeated string dataStrings = 7;
repeated int64 dataLongs = 8;
repeated float dataFloats = 9;
repeated int32 dataIntegers = 10;
repeated bytes dataBytes = 11;
repeated bool dataBooleans = 12;
}
message Empty {
......
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public interface ISegmentCostDAO {
JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from);
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostTable;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
* @author pengys5
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(SegmentCostTable.TABLE);
searchRequestBuilder.setTypes(SegmentCostTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
searchRequestBuilder.setQuery(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
mustQueryList.add(QueryBuilders.rangeQuery(SegmentCostTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
if (minCost != -1 || maxCost != -1) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentCostTable.COLUMN_COST);
if (minCost != -1) {
rangeQueryBuilder.gte(minCost);
}
if (maxCost != -1) {
rangeQueryBuilder.lte(maxCost);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!StringUtils.isEmpty(operationName)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_OPERATION_NAME, operationName));
}
if (!StringUtils.isEmpty(globalTraceId)) {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, globalTraceId));
}
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_COST, SortOrder.DESC);
searchRequestBuilder.setSize(limit);
searchRequestBuilder.setFrom(from);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
JsonObject topSegPaging = new JsonObject();
topSegPaging.addProperty("recordsTotal", searchResponse.getHits().totalHits);
JsonArray topSegArray = new JsonArray();
topSegPaging.add("data", topSegArray);
int num = from;
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
JsonObject topSegmentJson = new JsonObject();
topSegmentJson.addProperty("num", num);
String segId = (String)searchHit.getSource().get(SegmentCostTable.COLUMN_SEGMENT_ID);
topSegmentJson.addProperty(SegmentCostTable.COLUMN_SEGMENT_ID, segId);
topSegmentJson.addProperty(SegmentCostTable.COLUMN_START_TIME, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_START_TIME));
if (searchHit.getSource().containsKey(SegmentCostTable.COLUMN_END_TIME)) {
topSegmentJson.addProperty(SegmentCostTable.COLUMN_END_TIME, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_END_TIME));
}
topSegmentJson.addProperty(SegmentCostTable.COLUMN_OPERATION_NAME, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_OPERATION_NAME));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID, (String)searchHit.getSource().get(SegmentCostTable.COLUMN_GLOBAL_TRACE_ID));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_COST, (Number)searchHit.getSource().get(SegmentCostTable.COLUMN_COST));
topSegmentJson.addProperty(SegmentCostTable.COLUMN_IS_ERROR, (Boolean)searchHit.getSource().get(SegmentCostTable.COLUMN_IS_ERROR));
num++;
topSegArray.add(topSegmentJson);
}
return topSegPaging;
}
}
package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
return null;
}
}
......@@ -10,6 +10,7 @@ import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.jetty.JettyServer;
import org.skywalking.apm.collector.ui.UIModuleDefine;
import org.skywalking.apm.collector.ui.UIModuleGroupDefine;
import org.skywalking.apm.collector.ui.jetty.handler.SegmentTopGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.TraceDagGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.UIJettyServerHandler;
......@@ -48,6 +49,7 @@ public class UIJettyModuleDefine extends UIModuleDefine {
List<Handler> handlers = new LinkedList<>();
handlers.add(new UIJettyServerHandler());
handlers.add(new TraceDagGetHandler());
handlers.add(new SegmentTopGetHandler());
return handlers;
}
}
package org.skywalking.apm.collector.ui.jetty.handler;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.SegmentTopService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentTopGetHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(SegmentTopGetHandler.class);
@Override public String pathSpec() {
return "/segment/top";
}
private SegmentTopService service = new SegmentTopService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
if (!req.getParameterMap().containsKey("startTime") || !req.getParameterMap().containsKey("endTime") || !req.getParameterMap().containsKey("from") || !req.getParameterMap().containsKey("limit")) {
throw new ArgumentsParseException("the request parameter must contains startTime, endTime, from, limit");
}
if (logger.isDebugEnabled()) {
logger.debug("startTime: {}, endTime: {}, from: {}", req.getParameter("startTime"), req.getParameter("endTime"), req.getParameter("from"));
}
long startTime;
try {
startTime = Long.valueOf(req.getParameter("startTime"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter startTime must be a long");
}
long endTime;
try {
endTime = Long.valueOf(req.getParameter("endTime"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter endTime must be a long");
}
int from;
try {
from = Integer.valueOf(req.getParameter("from"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int limit;
try {
limit = Integer.valueOf(req.getParameter("limit"));
} catch (NumberFormatException e) {
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int minCost = -1;
if (req.getParameterMap().containsKey("minCost")) {
minCost = Integer.valueOf(req.getParameter("minCost"));
}
int maxCost = -1;
if (req.getParameterMap().containsKey("maxCost")) {
maxCost = Integer.valueOf(req.getParameter("maxCost"));
}
String globalTraceId = null;
if (req.getParameterMap().containsKey("globalTraceId")) {
globalTraceId = req.getParameter("globalTraceId");
}
String operationName = null;
if (req.getParameterMap().containsKey("operationName")) {
operationName = req.getParameter("operationName");
}
return service.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
}
}
package org.skywalking.apm.collector.ui.jetty.handler;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.ServletException;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.TraceDagService;
import org.slf4j.Logger;
......@@ -23,8 +21,7 @@ public class TraceDagGetHandler extends JettyHandler {
private TraceDagService service = new TraceDagService();
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String startTimeStr = req.getParameter("startTime");
String endTimeStr = req.getParameter("endTime");
String timeBucketType = req.getParameter("timeBucketType");
......@@ -32,8 +29,6 @@ public class TraceDagGetHandler extends JettyHandler {
long startTime = Long.valueOf(startTimeStr);
long endTime = Long.valueOf(endTimeStr);
JsonObject traceDagJson = service.load(startTime, endTime, timeBucketType);
reply(resp, traceDagJson, HttpServletResponse.SC_OK);
return service.load(startTime, endTime, timeBucketType);
}
}
package org.skywalking.apm.collector.ui.jetty.handler;
import com.google.gson.JsonArray;
import java.io.IOException;
import com.google.gson.JsonElement;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.jetty.UIJettyDataListener;
......@@ -22,15 +21,13 @@ public class UIJettyServerHandler extends JettyHandler {
return "/ui/jetty";
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
ClusterModuleRegistrationReader reader = ((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getReader();
List<String> servers = reader.read(UIJettyDataListener.PATH);
JsonArray serverArray = new JsonArray();
servers.forEach(server -> {
serverArray.add(server);
});
reply(resp, serverArray, HttpServletResponse.SC_OK);
return serverArray;
}
}
package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.ui.dao.ISegmentCostDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentTopService {
private final Logger logger = LoggerFactory.getLogger(SegmentTopService.class);
public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
logger.debug("startTime: {}, endTime: {}, minCost: {}, maxCost: {}, operationName: {}, globalTraceId: {}, limit: {}, from: {}", startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
ISegmentCostDAO segmentCostDAO = (ISegmentCostDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
return segmentCostDAO.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
}
}
org.skywalking.apm.collector.ui.dao.NodeComponentEsDAO
org.skywalking.apm.collector.ui.dao.NodeMappingEsDAO
org.skywalking.apm.collector.ui.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.ui.dao.NodeRefSumEsDAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.NodeRefSumEsDAO
org.skywalking.apm.collector.ui.dao.SegmentCostEsDAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.NodeComponentH2DAO
org.skywalking.apm.collector.ui.dao.NodeMappingH2DAO
org.skywalking.apm.collector.ui.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.ui.dao.NodeRefSumH2DAO
\ No newline at end of file
org.skywalking.apm.collector.ui.dao.NodeRefSumH2DAO
org.skywalking.apm.collector.ui.dao.SegmentCostH2DAO
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册