提交 ec08ca15 编写于 作者: wu-sheng's avatar wu-sheng

* add Agent2RoutingClient, to provie send ReqeustSpan and AckSpan to routing servers.

上级 c99679a8
......@@ -21,4 +21,12 @@ public class Client {
public TraceSearchClient newTraceSearchClient(StorageClientListener listener){
return new TraceSearchClient(channel, listener);
}
public void shutdown(){
channel.shutdownNow();
}
public boolean isShutdown(){
return channel.isShutdown() || channel.isTerminated();
}
}
......@@ -9,6 +9,8 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.List;
public class SpanStorageClient {
private final SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
......@@ -20,7 +22,7 @@ public class SpanStorageClient {
this.listener = listener;
}
public void sendRequestSpan(RequestSpan... requestSpan) {
public void sendRequestSpan(List<RequestSpan> requestSpan) {
StreamObserver<RequestSpan> requestSpanStreamObserver =
spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override
......@@ -46,7 +48,7 @@ public class SpanStorageClient {
requestSpanStreamObserver.onCompleted();
}
public void sendACKSpan(AckSpan... ackSpan) {
public void sendACKSpan(List<AckSpan> ackSpan) {
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
......
package com.a.eye.skywalking.agent;
import com.a.eye.skywalking.agent.junction.SkyWalkingEnhanceMatcher;
import com.a.eye.skywalking.client.Agent2RoutingClient;
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.conf.ConfigInitializer;
import com.a.eye.skywalking.logging.EasyLogResolver;
......@@ -35,6 +36,8 @@ public class SkyWalkingAgent {
logger = LogManager.getLogger(SkyWalkingAgent.class);
initConfig();
Agent2RoutingClient.INSTANCE.onReady();
final PluginDefineCategory pluginDefineCategory = PluginDefineCategory.category(new PluginBootstrap().loadPlugins());
new AgentBuilder.Default().type(enhanceClassMatcher(pluginDefineCategory).and(not(isInterface()))).transform(new AgentBuilder.Transformer() {
......
package com.a.eye.skywalking.client;
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.disruptor.ack.SendAckSpanEventHandler;
import com.a.eye.skywalking.disruptor.request.SendRequestSpanEventHandler;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.SendResult;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.LockSupport;
/**
* Created by wusheng on 2016/11/27.
*/
public class Agent2RoutingClient extends Thread {
private static ILog logger = LogManager.getLogger(Agent2RoutingClient.class);
private List<ServerAddr> addrList;
private Client client;
private SpanStorageClient spanStorageClient;
private NetworkListener listener;
private SendRequestSpanEventHandler requestSpanDataSupplier = null;
private SendAckSpanEventHandler ackSpanDataSupplier = null;
private volatile boolean connected = false;
public static Agent2RoutingClient INSTANCE = new Agent2RoutingClient();
public Agent2RoutingClient() {
String[] serverList = Config.SkyWalking.SERVERS.split(",");
addrList = new ArrayList<>(serverList.length);
for (String server : serverList) {
String[] addrSegments = server.split(":");
if (addrSegments.length != 2) {
throw new IllegalArgumentException("server addr should like ip:port, illegal addr:" + server);
}
addrList.add(new ServerAddr(addrSegments[0], addrSegments[2]));
}
listener = new NetworkListener();
}
public void onReady() {
this.connect();
this.start();
}
public void setRequestSpanDataSupplier(SendRequestSpanEventHandler requestSpanDataSupplier) {
this.requestSpanDataSupplier = requestSpanDataSupplier;
}
public void setAckSpanDataSupplier(SendAckSpanEventHandler ackSpanDataSupplier) {
this.ackSpanDataSupplier = ackSpanDataSupplier;
}
private void connect() {
try {
if(client != null && !client.isShutdown()){
client.shutdown();
}
int addrIdx = new Random().nextInt(addrList.size());
ServerAddr addr = addrList.get(addrIdx);
client = new Client(addr.ip, addr.port);
spanStorageClient = client.newSpanStorageClient(listener);
connected = true;
} catch (Exception e) {
HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.ERROR, "connect to routing node failure.");
}
}
@Override
public void run() {
while (true) {
try {
while (connected && !client.isShutdown()) {
List<RequestSpan> requestData = this.requestSpanDataSupplier.getBufferData();
List<AckSpan> ackData = this.ackSpanDataSupplier.getBufferData();
if (requestData.size() > 0 || ackData.size() > 0) {
listener.begin();
spanStorageClient.sendRequestSpan(requestData);
spanStorageClient.sendACKSpan(ackData);
while (!listener.isBatchFinished()) {
LockSupport.parkNanos(1);
}
} else {
try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException e) {
}
}
}
try {
Thread.sleep(30 * 1000L);
} catch (InterruptedException e) {
}
this.connect();
} catch (Throwable e) {
logger.error("unexpected failure.", e);
}
}
}
class NetworkListener implements StorageClientListener {
private volatile boolean batchFinished = false;
void begin() {
batchFinished = false;
}
boolean isBatchFinished() {
return batchFinished;
}
@Override
public void onError(Throwable throwable) {
batchFinished = true;
HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.ERROR, "send data to routing node failure.");
}
@Override
public void onBatchFinished(SendResult sendResult) {
batchFinished = true;
HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.INFO, "batch send data to routing node.");
}
}
class ServerAddr {
String ip;
Integer port;
public ServerAddr(String ip, String port) {
this.ip = ip;
try {
this.port = Integer.parseInt(port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("server addr should like ip:port, illegal port:" + port);
}
}
}
}
......@@ -2,7 +2,7 @@ package com.a.eye.skywalking.conf;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.protocol.util.StringUtil;
import com.a.eye.skywalking.util.StringUtil;
import java.io.File;
import java.io.FileInputStream;
......
package com.a.eye.skywalking.disruptor.ack;
import com.a.eye.skywalking.client.Agent2RoutingClient;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
......@@ -14,25 +16,39 @@ import java.util.List;
* Created by wusheng on 2016/11/24.
*/
public class SendAckSpanEventHandler implements EventHandler<AckSpanHolder> {
private static ILog logger = LogManager.getLogger(SendAckSpanEventHandler.class);
private int bufferSize = 100;
private List<AckSpan> buffer = new ArrayList<>(bufferSize);
private static ILog logger = LogManager.getLogger(SendAckSpanEventHandler.class);
private int bufferSize = 100;
private AckSpan[] buffer = new AckSpan[bufferSize];
private int bufferIdx = 0;
public SendAckSpanEventHandler() {
Agent2RoutingClient.INSTANCE.setAckSpanDataSupplier(this);
}
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getData());
if(buffer[bufferIdx] != null){
return;
}
buffer[bufferIdx] = event.getData();
bufferIdx++;
if (bufferIdx == buffer.length) {
bufferIdx = 0;
}
if (endOfBatch || buffer.size() == bufferSize) {
try {
//TODO, use GRPC to send
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendAckSpanEventHandler").updateData(HeathReading.INFO, "AckSpan messages were successful consumed .");
}
}
HealthCollector.getCurrentHeathReading("SendAckSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
} finally {
buffer.clear();
}
public List<AckSpan> getBufferData(){
List<AckSpan> data = new ArrayList<AckSpan>(bufferSize);
for (int i = 0; i < buffer.length; i++) {
data.add(buffer[i]);
buffer[i] = null;
}
return data;
}
}
package com.a.eye.skywalking.disruptor.request;
import com.a.eye.skywalking.client.Agent2RoutingClient;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
......@@ -14,25 +15,39 @@ import java.util.List;
* Created by wusheng on 2016/11/24.
*/
public class SendRequestSpanEventHandler implements EventHandler<RequestSpanHolder> {
private static ILog logger = LogManager.getLogger(SendRequestSpanEventHandler.class);
private static final int bufferSize = 100;
private List<RequestSpan> buffer = new ArrayList<>(bufferSize);
private static ILog logger = LogManager.getLogger(SendRequestSpanEventHandler.class);
private static final int bufferSize = 100;
private RequestSpan[] buffer = new RequestSpan[bufferSize];
private int bufferIdx = 0;
public SendRequestSpanEventHandler() {
Agent2RoutingClient.INSTANCE.setRequestSpanDataSupplier(this);
}
@Override
public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getData());
if(buffer[bufferIdx] != null){
return;
}
buffer[bufferIdx] = event.getData();
bufferIdx++;
if(bufferIdx == buffer.length){
bufferIdx = 0;
}
if (endOfBatch || buffer.size() == bufferSize) {
try {
//TODO, use GRPC to send
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendRequestSpanEventHandler").updateData(HeathReading.INFO, "Request Span messages were successful consumed .");
}
}
HealthCollector.getCurrentHeathReading("SendRequestSpanEventHandler").updateData(HeathReading.INFO, "%s messages were successful consumed .", buffer.size());
} finally {
buffer.clear();
}
public List<RequestSpan> getBufferData(){
List<RequestSpan> data = new ArrayList<RequestSpan>(bufferSize);
for (int i = 0; i < buffer.length; i++) {
data.add(buffer[i]);
buffer[i] = null;
}
return data;
}
}
......@@ -11,7 +11,7 @@ import com.a.eye.skywalking.model.Identification;
import com.a.eye.skywalking.model.Span;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.protocol.util.BuriedPointMachineUtil;
import com.a.eye.skywalking.util.BuriedPointMachineUtil;
import java.util.HashSet;
import java.util.Set;
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.model.ContextData;
import com.a.eye.skywalking.model.EmptyContextData;
import com.a.eye.skywalking.model.Identification;
import com.a.eye.skywalking.model.Span;
import com.a.eye.skywalking.protocol.util.ContextGenerator;
import com.a.eye.skywalking.util.ContextGenerator;
public class LocalMethodInvokeMonitor extends BaseInvokeMonitor {
......
......@@ -5,8 +5,7 @@ import com.a.eye.skywalking.context.CurrentThreadSpanStack;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.model.*;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.protocol.util.ContextGenerator;
import com.a.eye.skywalking.util.ContextGenerator;
public class RPCClientInvokeMonitor extends BaseInvokeMonitor {
......
......@@ -6,7 +6,7 @@ import com.a.eye.skywalking.model.ContextData;
import com.a.eye.skywalking.model.Identification;
import com.a.eye.skywalking.model.Span;
import com.a.eye.skywalking.model.SpanType;
import com.a.eye.skywalking.protocol.util.ContextGenerator;
import com.a.eye.skywalking.util.ContextGenerator;
public class RPCServerInvokeMonitor extends BaseInvokeMonitor {
......
package com.a.eye.skywalking.logging;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.protocol.util.LoggingUtil;
import com.a.eye.skywalking.util.LoggingUtil;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
......
......@@ -2,7 +2,7 @@ package com.a.eye.skywalking.logging;
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.protocol.util.LoggingUtil;
import com.a.eye.skywalking.util.LoggingUtil;
import java.io.File;
import java.io.FileNotFoundException;
......
package com.a.eye.skywalking.model;
import com.a.eye.skywalking.api.IBuriedPointType;
import com.a.eye.skywalking.protocol.util.StringUtil;
import java.util.HashMap;
import java.util.Map;
import com.a.eye.skywalking.util.StringUtil;
public class Identification {
private String viewPoint;
......
......@@ -2,7 +2,7 @@ package com.a.eye.skywalking.plugin;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.protocol.util.StringUtil;
import com.a.eye.skywalking.util.StringUtil;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.pool.TypePool.Resolution;
......
......@@ -7,7 +7,7 @@ import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import com.a.eye.skywalking.protocol.util.StringUtil;
import com.a.eye.skywalking.util.StringUtil;
public class PluginCfg {
public final static PluginCfg CFG = new PluginCfg();
......
......@@ -4,13 +4,12 @@ import static net.bytebuddy.jar.asm.Opcodes.ACC_PRIVATE;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.not;
import com.a.eye.skywalking.logging.EasyLogger;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.plugin.AbstractClassEnhancePluginDefine;
import com.a.eye.skywalking.plugin.PluginException;
import com.a.eye.skywalking.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.protocol.util.StringUtil;
import com.a.eye.skywalking.util.StringUtil;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.MethodDelegation;
......
package com.a.eye.skywalking.protocol.util;
package com.a.eye.skywalking.util;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
......
package com.a.eye.skywalking.protocol.util;
package com.a.eye.skywalking.util;
import com.a.eye.skywalking.conf.Config;
import com.a.eye.skywalking.model.ContextData;
......
package com.a.eye.skywalking.protocol.util;
package com.a.eye.skywalking.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
......
package com.a.eye.skywalking.protocol.util;
package com.a.eye.skywalking.util;
public final class StringUtil {
public static boolean isEmpty(String str) {
......
......@@ -29,8 +29,8 @@ public class StorageThread extends Thread {
@Override
public void run() {
RequestSpan[] requestSpanList = new RequestSpan[10];
AckSpan[] ackSpanList = new AckSpan[10];
List<RequestSpan> requestSpanList = new ArrayList<RequestSpan>();
List<AckSpan> ackSpanList = new ArrayList<AckSpan>();
int cycle = 0;
for (int i = 0; i < count; i++) {
......@@ -55,8 +55,8 @@ public class StorageThread extends Thread {
listener.begin();
}
requestSpanList[cycle] = requestSpan;
ackSpanList[cycle] = ackSpan;
requestSpanList.add(requestSpan);
ackSpanList.add(ackSpan);
cycle++;
if (i % 10_000 == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册