提交 4dfe4e55 编写于 作者: P pengys5

Refactor exception

上级 e38b6ad1
......@@ -24,10 +24,10 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
......@@ -39,7 +39,7 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
......@@ -49,7 +49,7 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
protected abstract void onWork(Object message) throws WorkerException;
static class WorkerWithAkka extends UntypedActor {
private Logger logger = LogManager.INSTANCE.getFormatterLogger(WorkerWithAkka.class);
......@@ -63,12 +63,12 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
}
@Override
public void preStart() throws Exception {
public void preStart() {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() throws Exception {
public void postStop() {
cluster.unsubscribe(getSelf());
}
......@@ -78,16 +78,16 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws Throwable {
public void onReceive(Object message) throws WorkerException {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState)message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp)message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
......
......@@ -25,13 +25,11 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link ClusterWorkerRef}
* @throws IllegalArgumentException Not used.
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override
final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException {
int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());
T clusterWorker = workerInstance(getClusterContext());
......
......@@ -17,10 +17,10 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
......@@ -40,9 +40,9 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* Receive message
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
* @throws WorkerException The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
......@@ -50,9 +50,9 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
* @throws WorkerException Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
protected abstract void onWork(Object message) throws WorkerException;
static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
......@@ -68,8 +68,8 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
......@@ -90,9 +90,8 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
* @throws Exception not used.
*/
public void tell(Object message) throws Exception {
public void tell(Object message) {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
......
......@@ -13,10 +13,9 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
public abstract int queueSize();
@Override
final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
T localAsyncWorker = (T) workerInstance(getClusterContext());
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException {
T localAsyncWorker = (T)workerInstance(getClusterContext());
localAsyncWorker.preStart();
// Specify the size of the ring buffer, must be power of 2.
......
package org.skywalking.apm.collector.actor;
/**
* The <code>AbstractLocalSyncWorker</code> use to define workers that receive data from jvm inside call and the
* workers response result in real time.
*
* <p> The implementation class is same as normal class, it make the framework be similar to the asynchronous
* workers inside jvm and outside jvm.
* The <code>AbstractLocalSyncWorker</code> defines workers who receive data from jvm inside call and response in real
* time.
*
* @author pengys5
* @since v3.0-2017
......@@ -18,25 +15,27 @@ public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
/**
* Called by the worker reference to execute the worker service.
*
* @param request {@link Object} is a in parameter
* @param response {@link Object} is a out parameter
* @throws Exception
* @param request {@link Object} is an input parameter
* @param response {@link Object} is an output parameter
*/
final public void allocateJob(Object request, Object response) throws Exception {
onWork(request, response);
final public void allocateJob(Object request, Object response) throws WorkerInvokeException {
try {
onWork(request, response);
} catch (WorkerException e) {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
/**
* Override this method to implementing business logic.
* Override this method to implement business logic.
*
* @param request {@link Object} is a in parameter
* @param response {@link Object} is a out parameter
* @throws Exception
*/
protected abstract void onWork(Object request, Object response) throws Exception;
protected abstract void onWork(Object request, Object response) throws WorkerException;
/**
* Called by the worker on start.
* Prepare methods before this work starts to work.
* <p>Usually, create or find the workers reference should be call.
*
* @throws ProviderNotFoundException
......
......@@ -7,7 +7,7 @@ public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyn
@Override
final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
LocalWorkerContext localContext) throws ProviderNotFoundException {
T localSyncWorker = (T) workerInstance(getClusterContext());
localSyncWorker.preStart();
......
......@@ -12,7 +12,7 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
public abstract T workerInstance(ClusterWorkerContext clusterContext);
public abstract WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException;
LocalWorkerContext localContext) throws ProviderNotFoundException;
final public void setClusterContext(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
......@@ -23,12 +23,12 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
}
final public WorkerRef create(
AbstractWorker workerOwner) throws IllegalArgumentException, ProviderNotFoundException {
AbstractWorker workerOwner) throws ProviderNotFoundException {
if (workerOwner == null) {
return onCreate(null);
} else if (workerOwner.getSelfContext() instanceof LocalWorkerContext) {
return onCreate((LocalWorkerContext) workerOwner.getSelfContext());
return onCreate((LocalWorkerContext)workerOwner.getSelfContext());
} else {
throw new IllegalArgumentException("the argument of workerOwner is Illegal");
}
......
......@@ -13,7 +13,7 @@ public class LocalAsyncWorkerRef extends WorkerRef {
}
@Override
public void tell(Object message) throws Exception {
public void tell(Object message) throws WorkerInvokeException {
workerWithDisruptor.tell(message);
}
}
......@@ -13,11 +13,11 @@ public class LocalSyncWorkerRef extends WorkerRef {
}
@Override
public void tell(Object message) throws Exception {
public void tell(Object message) throws WorkerInvokeException {
localSyncWorker.allocateJob(message, null);
}
public void ask(Object request, Object response) throws Exception {
public void ask(Object request, Object response) throws WorkerInvokeException {
localSyncWorker.allocateJob(request, response);
}
}
......@@ -5,5 +5,5 @@ package org.skywalking.apm.collector.actor;
*/
public interface Provider {
WorkerRef create(AbstractWorker workerOwner) throws IllegalArgumentException, ProviderNotFoundException;
WorkerRef create(AbstractWorker workerOwner) throws ProviderNotFoundException;
}
package org.skywalking.apm.collector.actor;
/**
* Defines a general exception a worker can throw when it
* encounters difficulty.
*
* @author pengys5
* @since v3.1-2017
*/
public class WorkerException extends Exception {
public WorkerException(String message) {
super(message);
}
public WorkerException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.actor;
/**
* This exception is raised when call (or ask) worker.
*
* @author pengys5
* @since v3.1-2017
*/
public class WorkerInvokeException extends WorkerException {
public WorkerInvokeException(String message) {
super(message);
}
public WorkerInvokeException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.actor;
public class WorkerNotFoundException extends Exception {
public class WorkerNotFoundException extends WorkerException {
public WorkerNotFoundException(String message) {
super(message);
}
......
......@@ -14,5 +14,5 @@ public abstract class WorkerRef {
return role;
}
public abstract void tell(Object message) throws Exception;
public abstract void tell(Object message) throws WorkerInvokeException;
}
package org.skywalking.apm.collector.actor;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import java.util.List;
/**
* @author pengys5
*/
......@@ -21,15 +20,15 @@ public class WorkerRefs<T extends WorkerRef> {
this.workerSelector = workerSelector;
}
public void tell(Object message) throws Exception {
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of %s", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
}
public void ask(Object request, Object response) throws Exception {
public void ask(Object request, Object response) throws WorkerInvokeException {
WorkerRef workerRef = workerSelector.select(workerRefs, request);
if (workerRef instanceof LocalSyncWorkerRef) {
((LocalSyncWorkerRef) workerRef).ask(request, response);
((LocalSyncWorkerRef)workerRef).ask(request, response);
} else {
throw new IllegalAccessError("only local sync worker can ask");
}
......
......@@ -79,7 +79,7 @@ public class AbstractClusterWorkerTestCase {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws IllegalArgumentException {
}
}
......
package org.skywalking.apm.collector.worker;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.worker.config.CacheSizeConfig;
......@@ -15,24 +19,19 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
private int messageNum;
public abstract void analyse(Object message) throws Exception;
public abstract void analyse(Object message);
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
final public void onWork(Object message) throws Exception {
@Override final public void onWork(Object message) {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
messageNum++;
try {
analyse(message);
} catch (Exception e) {
saveException(e);
}
analyse(message);
if (messageNum >= CacheSizeConfig.Cache.Analysis.SIZE) {
aggregation();
......@@ -41,5 +40,5 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
}
}
protected abstract void aggregation() throws Exception;
protected abstract void aggregation();
}
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.worker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitAnalysisData;
......@@ -21,17 +22,16 @@ public abstract class JoinAndSplitAnalysisMember extends AnalysisMember {
return joinAndSplitAnalysisData;
}
final protected void set(String id, String attributeName, String value) throws Exception {
final protected void set(String id, String attributeName, String value) {
getJoinAndSplitAnalysisData().getOrCreate(id).set(attributeName, value);
}
@Override
final protected void aggregation() throws Exception {
@Override final protected void aggregation() {
getJoinAndSplitAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
logger().error(e);
} catch (WorkerInvokeException e) {
logger().error(e.getMessage(), e);
}
});
getJoinAndSplitAnalysisData().asMap().clear();
......
package org.skywalking.apm.collector.worker;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequestBuilder;
......@@ -11,9 +13,6 @@ import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitData;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitPersistenceData;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
......@@ -21,7 +20,8 @@ public abstract class JoinAndSplitPersistenceMember extends PersistenceMember<Jo
private Logger logger = LogManager.getFormatterLogger(JoinAndSplitPersistenceMember.class);
protected JoinAndSplitPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected JoinAndSplitPersistenceMember(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,10 +30,9 @@ public abstract class JoinAndSplitPersistenceMember extends PersistenceMember<Jo
return new JoinAndSplitPersistenceData();
}
@Override
final public void analyse(Object message) throws Exception {
@Override final public void analyse(Object message) {
if (message instanceof JoinAndSplitData) {
JoinAndSplitData joinAndSplitData = (JoinAndSplitData) message;
JoinAndSplitData joinAndSplitData = (JoinAndSplitData)message;
JoinAndSplitPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(joinAndSplitData.getId()).merge(joinAndSplitData);
......@@ -43,8 +42,7 @@ public abstract class JoinAndSplitPersistenceMember extends PersistenceMember<Jo
}
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
@Override final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, JoinAndSplitData> lastData = getPersistenceData().getLast().asMap();
extractData(lastData);
......
......@@ -16,7 +16,7 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
final protected void set(String id, String metricName, Long value) throws Exception {
final protected void set(String id, String metricName, Long value) {
getMetricAnalysisData().getOrCreate(id).set(metricName, value);
}
......@@ -24,8 +24,7 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
return metricAnalysisData;
}
@Override
final protected void aggregation() throws Exception {
@Override final protected void aggregation() {
getMetricAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
......
package org.skywalking.apm.collector.worker;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequestBuilder;
......@@ -11,9 +13,6 @@ import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.MetricData;
import org.skywalking.apm.collector.worker.storage.MetricPersistenceData;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
......@@ -30,10 +29,9 @@ public abstract class MetricPersistenceMember extends PersistenceMember<MetricPe
return new MetricPersistenceData();
}
@Override
final public void analyse(Object message) throws Exception {
@Override final public void analyse(Object message) {
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
MetricData metricData = (MetricData)message;
MetricPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(metricData.getId()).merge(metricData);
......@@ -43,8 +41,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember<MetricPe
}
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
@Override final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, MetricData> lastData = getPersistenceData().getLast().asMap();
extractData(lastData);
......
package org.skywalking.apm.collector.worker;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetResponse;
......@@ -8,12 +11,17 @@ import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.worker.storage.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.worker.storage.Data;
import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.FlushAndSwitch;
import org.skywalking.apm.collector.worker.storage.PersistenceData;
import org.skywalking.apm.collector.worker.storage.Window;
/**
* @author pengys5
......@@ -39,23 +47,26 @@ public abstract class PersistenceMember<T extends Window & PersistenceData, D ex
public abstract String esType();
public abstract void analyse(Object message) throws Exception;
public abstract void analyse(Object message);
@Override
final public void preStart() throws ProviderNotFoundException {
@Override final public void preStart() throws ProviderNotFoundException {
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof FlushAndSwitch) {
persistenceData.switchPointer();
while (persistenceData.getLast().isHolding()) {
Thread.sleep(10);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new WorkerException(e.getMessage(), e);
}
}
if (response instanceof LinkedList) {
prepareIndex((LinkedList) response);
prepareIndex((LinkedList)response);
} else {
logger.error("unhandled response, response instance must LinkedList, but is %s", response.getClass().toString());
}
......
......@@ -6,6 +6,7 @@ import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.worker.storage.RecordAnalysisData;
......@@ -22,7 +23,7 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
final public void set(String id, JsonObject record) throws Exception {
final public void set(String id, JsonObject record) {
getRecordAnalysisData().getOrCreate(id).set(record);
}
......@@ -30,13 +31,12 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
return recordAnalysisData;
}
@Override
final protected void aggregation() throws Exception {
@Override final protected void aggregation() {
getRecordAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
logger.error(e, e);
} catch (WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
});
getRecordAnalysisData().asMap().clear();
......
package org.skywalking.apm.collector.worker;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -9,9 +11,6 @@ import org.skywalking.apm.collector.worker.storage.EsClient;
import org.skywalking.apm.collector.worker.storage.RecordData;
import org.skywalking.apm.collector.worker.storage.RecordPersistenceData;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
......@@ -21,15 +20,14 @@ public abstract class RecordPersistenceMember extends PersistenceMember<RecordPe
super(role, clusterContext, selfContext);
}
@Override
final public RecordPersistenceData initializeData() {
@Override final public RecordPersistenceData initializeData() {
return new RecordPersistenceData();
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof RecordData) {
RecordData recordData = (RecordData) message;
RecordData recordData = (RecordData)message;
logger().debug("set: id: %s, data: %s", recordData.getId(), recordData.get());
RecordPersistenceData data = getPersistenceData();
data.hold();
......@@ -40,8 +38,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember<RecordPe
}
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
@Override final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, RecordData> lastData = getPersistenceData().getLast().asMap();
extractData(lastData);
......
......@@ -9,11 +9,14 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.persistence.GlobalTraceSearchWithGlobalId;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
/**
......@@ -32,11 +35,15 @@ public class GlobalTraceGetWithGlobalId extends AbstractGet {
getClusterContext().findProvider(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("globalId")) {
throw new IllegalArgumentException("the request parameter must contains globalId");
}
logger.debug("globalId: %s", Arrays.toString(parameter.get("globalId")));
if (logger.isDebugEnabled()) {
logger.debug("globalId: %s", Arrays.toString(parameter.get("globalId")));
}
String globalId = ParameterTools.INSTANCE.toString(parameter, "globalId");
......
......@@ -30,7 +30,7 @@ public class GlobalTraceAnalysis extends JoinAndSplitAnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
......
......@@ -26,7 +26,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof JoinAndSplitData) {
getSelfContext().lookup(GlobalTraceSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,22 +2,31 @@ package org.skywalking.apm.collector.worker.globaltrace.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
import org.skywalking.apm.collector.worker.segment.SegmentIndex;
import org.skywalking.apm.collector.worker.segment.entity.*;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.segment.entity.SpanView;
import org.skywalking.apm.collector.worker.segment.entity.TraceSegmentRef;
import org.skywalking.apm.collector.worker.storage.GetResponseFromEs;
import org.skywalking.apm.collector.worker.storage.JoinAndSplitData;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.skywalking.apm.util.StringUtil;
/**
* @author pengys5
......@@ -28,14 +37,15 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
private Gson gson = new Gson();
public GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof String) {
String globalId = (String) request;
String globalId = (String)request;
String globalTraceData = GetResponseFromEs.INSTANCE.get(GlobalTraceIndex.INDEX, GlobalTraceIndex.TYPE_RECORD, globalId).getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
logger.debug("globalTraceObj: %s", globalTraceObj);
......@@ -48,7 +58,12 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
logger.debug("subSegId: %s", subSegId);
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, subSegId).getSourceAsString();
logger.debug("segmentSource: %s", segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
Segment segment = null;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
String segmentId = segment.getTraceSegmentId();
List<TraceSegmentRef> refsList = segment.getRefs();
......@@ -58,7 +73,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
}
}
JsonObject responseObj = (JsonObject) response;
JsonObject responseObj = (JsonObject)response;
responseObj.addProperty("result", buildTree(spanViewList));
} else {
logger.error("unhandled message, message instance must String, but is %s", request.getClass().toString());
......@@ -114,7 +129,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
}
private void spansDataBuild(Span span, String appCode, String segmentId, List<SpanView> spanViewList,
List<TraceSegmentRef> refsList) {
List<TraceSegmentRef> refsList) {
int spanId = span.getSpanId();
String spanSegId = segmentId + "--" + String.valueOf(spanId);
......
......@@ -13,6 +13,8 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
/**
* The <code>AbstractGet</code> implementations represent workers, which called by the server to allow a servlet to
......@@ -30,13 +32,14 @@ public abstract class AbstractGet extends AbstractServlet {
}
/**
* Add final modifier to avoid the subclass override this method.
* Forbid the subclasses override this method.
*
* @param parameter {@link Object} data structure of the map
* @param response {@link Object} is a out parameter
* @throws Exception
*/
@Override final protected void onWork(Object parameter, Object response) throws Exception {
@Override final protected void onWork(Object parameter,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
super.onWork(parameter, response);
}
......
......@@ -13,6 +13,8 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
/**
* The <code>AbstractGet</code> implementations represent workers, which called by the server to allow a servlet to
......@@ -34,9 +36,12 @@ public abstract class AbstractPost extends AbstractServlet {
*
* @param parameter {@link Object} data structure of the map
* @param response {@link Object} is a out parameter
* @throws Exception
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override final protected void onWork(Object parameter, Object response) throws Exception {
@Override final protected void onWork(Object parameter,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
super.onWork(parameter, response);
}
......
......@@ -5,10 +5,14 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
/**
* The <code>AbstractServlet</code> implementations represent workers, which called by the server to allow a servlet to
......@@ -21,6 +25,8 @@ import org.skywalking.apm.collector.actor.Role;
*/
public abstract class AbstractServlet extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(AbstractServlet.class);
public AbstractServlet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,15 +36,18 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
*
* @param parameter {@link Object} data structure of the map
* @param response {@link Object} is a out parameter
* @throws Exception
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override protected void onWork(Object parameter, Object response) throws Exception {
@Override protected void onWork(Object parameter,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonObject resJson = new JsonObject();
try {
onReceive((Map<String, String[]>)parameter, resJson);
onSuccessResponse((HttpServletResponse)response, resJson);
} catch (Exception e) {
onErrorResponse(e, (HttpServletResponse)response);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
......@@ -47,16 +56,19 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @throws Exception if any error is detected when worker execute business logic.
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
protected abstract void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception;
protected abstract void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
/**
* Set the worker response and the success status into the servlet response object
*
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @throws IOException
* @throws IOException if any error is detected when the servlet handles the response.
*/
protected void onSuccessResponse(HttpServletResponse response, JsonObject resJson) throws IOException {
resJson.addProperty("isSuccess", true);
......@@ -68,14 +80,17 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
*
* @param exception a {@link Exception} when the worker handles the request
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @throws IOException if an input or output error is detected when the servlet handles the request
*/
protected void onErrorResponse(Exception exception, HttpServletResponse response) throws IOException {
protected void onErrorResponse(Exception exception, HttpServletResponse response) {
JsonObject resJson = new JsonObject();
resJson.addProperty("isSuccess", false);
resJson.addProperty("reason", exception.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
try {
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
/**
......@@ -84,7 +99,7 @@ public abstract class AbstractServlet extends AbstractLocalSyncWorker {
* @param response {@link HttpServletResponse} object that contains the response the servlet reply to the client
* @param resJson {@link JsonObject} object that contains the response from worker
* @param status http status code
* @throws IOException if an input or output error is detected when the servlet handles the request
* @throws IOException if an input or output error is detected when the servlet handles the response
*/
private void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
response.setContentType("text/json");
......
......@@ -14,6 +14,8 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
/**
* The <code>AbstractStreamPost</code> implementations represent workers, which called by the server to allow a servlet
......@@ -37,9 +39,12 @@ public abstract class AbstractStreamPost extends AbstractServlet {
*
* @param reader {@link BufferedReader} json construct
* @param response {@link Object} is a out parameter
* @throws Exception
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override final protected void onWork(Object reader, Object response) throws Exception {
@Override final protected void onWork(Object reader,
Object response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
JsonObject resJson = new JsonObject();
try {
onReceive((BufferedReader)reader, resJson);
......@@ -54,10 +59,13 @@ public abstract class AbstractStreamPost extends AbstractServlet {
*
* @param parameter {@link Map}, get the request parameter by key.
* @param response {@link JsonObject}, set the response data as json object.
* @throws Exception
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
@Override final protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
throw new IllegalAccessException("Use the other method with buffer reader parameter");
@Override final protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
throw new WorkerInvokeException("Use the other method with buffer reader parameter");
}
/**
......@@ -65,9 +73,12 @@ public abstract class AbstractStreamPost extends AbstractServlet {
*
* @param reader {@link BufferedReader} json construct
* @param response {@link Object} is a out parameter
* @throws Exception
* @throws ArgumentsParseException if the key could not contains in parameter
* @throws WorkerInvokeException if any error is detected when call(or ask) worker
* @throws WorkerNotFoundException if the worker reference could not found in context.
*/
protected abstract void onReceive(BufferedReader reader, JsonObject response) throws Exception;
protected abstract void onReceive(BufferedReader reader,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException;
static class StreamPostWithHttpServlet extends HttpServlet {
......
package org.skywalking.apm.collector.worker.httpserver;
import org.skywalking.apm.collector.actor.WorkerException;
/**
* This exception is raised when argument not found or data type conversion from request.
*
* @author pengys5
* @since v3.1-2017
*/
public class ArgumentsParseException extends WorkerException {
public ArgumentsParseException(String message) {
super(message);
}
public ArgumentsParseException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.worker.node.analysis;
import com.google.gson.JsonObject;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -15,8 +16,6 @@ import org.skywalking.apm.collector.worker.tools.ClientSpanIsLeafTools;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.collector.worker.tools.SpanPeersTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -25,11 +24,11 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeCompAnalysis.class);
AbstractNodeCompAnalysis(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseSpans(Segment segment) throws Exception {
final void analyseSpans(Segment segment) {
List<Span> spanList = segment.getSpans();
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
......
package org.skywalking.apm.collector.worker.node.analysis;
import com.google.gson.JsonObject;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -13,8 +14,6 @@ import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.TraceSegmentRef;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -23,11 +22,11 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeMappingAnalysis.class);
AbstractNodeMappingAnalysis(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseRefs(Segment segment, long timeSlice) throws Exception {
final void analyseRefs(Segment segment, long timeSlice) {
List<TraceSegmentRef> segmentRefList = segment.getRefs();
logger.debug("node mapping analysis refs isNotEmpty %s", CollectionTools.isNotEmpty(segmentRefList));
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.node.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,14 +22,14 @@ public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeCompAnalysis.class);
NodeCompAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseSpans(segment);
} else {
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.node.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,14 +22,14 @@ public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeMappingDayAnalysis.class);
public NodeMappingDayAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
} else {
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.node.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,14 +22,14 @@ public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeMappingHourAnalysis.class);
NodeMappingHourAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
} else {
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.node.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,14 +22,14 @@ public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeMappingMinuteAnalysis.class);
NodeMappingMinuteAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.node.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeCompAgg.class);
NodeCompAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeCompSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -8,7 +8,12 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.node.NodeCompIndex;
......@@ -26,7 +31,7 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
}
@Override
public void onWork(Object request, Object response) throws Exception {
public void onWork(Object request, Object response) throws WorkerException {
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeCompIndex.INDEX);
searchRequestBuilder.setTypes(NodeCompIndex.TYPE_RECORD);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
......@@ -38,13 +43,13 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
JsonArray nodeCompArray = new JsonArray();
for (SearchHit searchHit : searchHits) {
JsonObject nodeCompObj = new JsonObject();
nodeCompObj.addProperty(NodeCompIndex.NAME, (String) searchHit.getSource().get(NodeCompIndex.NAME));
nodeCompObj.addProperty(NodeCompIndex.PEERS, (String) searchHit.getSource().get(NodeCompIndex.PEERS));
nodeCompObj.addProperty(NodeCompIndex.NAME, (String)searchHit.getSource().get(NodeCompIndex.NAME));
nodeCompObj.addProperty(NodeCompIndex.PEERS, (String)searchHit.getSource().get(NodeCompIndex.PEERS));
nodeCompArray.add(nodeCompObj);
logger.debug("node: %s", nodeCompObj.toString());
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeCompArray);
}
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.node.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeMappingDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeMappingDayAgg.class);
NodeMappingDayAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeMappingDayAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingDaySave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.node.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeMappingHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeMappingHourAgg.class);
NodeMappingHourAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeMappingHourAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingHourSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -26,7 +26,7 @@ public class NodeMappingMinuteAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingMinuteSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -10,7 +10,12 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
......@@ -30,9 +35,9 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
@Override
public void onWork(Object request, Object response) throws Exception {
public void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeMappingIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
......@@ -59,7 +64,7 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
logger.debug("node mapping data: %s", nodeMappingArray.toString());
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add(Const.RESULT, nodeMappingArray);
} else {
logger.error("unhandled message, message instance must NodeMappingSearchWithTimeSlice.RequestEntity, but is %s", request.getClass().toString());
......
......@@ -9,10 +9,13 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumGroupWithTimeSlice;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
......@@ -32,25 +35,29 @@ public class NodeRefResSumGetGroupWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(NodeRefResSumGroupWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
if (logger.isDebugEnabled()) {
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
}
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
long startTime;
try {
startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
throw new ArgumentsParseException("the request parameter startTime must be a long");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
throw new ArgumentsParseException("the request parameter endTime must be a long");
}
NodeRefResSumGroupWithTimeSlice.RequestEntity requestEntity;
......
package org.skywalking.apm.collector.worker.noderef.analysis;
import com.google.gson.JsonObject;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -16,8 +17,6 @@ import org.skywalking.apm.collector.worker.tools.ClientSpanIsLeafTools;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import org.skywalking.apm.collector.worker.tools.SpanPeersTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -26,12 +25,12 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeRefAnalysis.class);
AbstractNodeRefAnalysis(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseNodeRef(Segment segment, long timeSlice, long minute, long hour, long day,
int second) throws Exception {
int second) {
List<Span> spanList = segment.getSpans();
if (CollectionTools.isNotEmpty(spanList)) {
for (Span span : spanList) {
......@@ -70,7 +69,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day,
int second) throws Exception {
int second) {
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord = new AbstractNodeRefResSumAnalysis.NodeRefResRecord(minute, hour, day, second);
refResRecord.setStartTime(span.getStartTime());
refResRecord.setEndTime(span.getEndTime());
......@@ -80,5 +79,5 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
protected abstract void sendToResSumAnalysis(
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord);
}
......@@ -13,11 +13,11 @@ import org.skywalking.apm.collector.worker.storage.AbstractTimeSlice;
abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
AbstractNodeRefResSumAnalysis(Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseResSum(NodeRefResRecord nodeRefRes) throws Exception {
final void analyseResSum(NodeRefResRecord nodeRefRes) {
long startTime = nodeRefRes.startTime;
long endTime = nodeRefRes.endTime;
boolean isError = nodeRefRes.isError;
......
......@@ -2,7 +2,13 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,7 +24,7 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAnalysis.class);
protected NodeRefDayAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -29,9 +35,9 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
......@@ -45,8 +51,12 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumDayAnalysis.Role.INSTANCE).tell(refResRecord);
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) {
try {
getSelfContext().lookup(NodeRefResSumDayAnalysis.Role.INSTANCE).tell(refResRecord);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
@Override
......
......@@ -2,7 +2,13 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,7 +24,7 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAnalysis.class);
protected NodeRefHourAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -29,9 +35,9 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
......@@ -45,8 +51,12 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumHourAnalysis.Role.INSTANCE).tell(refResRecord);
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) {
try {
getSelfContext().lookup(NodeRefResSumHourAnalysis.Role.INSTANCE).tell(refResRecord);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
@Override
......
......@@ -2,7 +2,13 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -18,7 +24,7 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAnalysis.class);
protected NodeRefMinuteAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -29,9 +35,9 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......@@ -44,8 +50,12 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
}
@Override
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception {
getSelfContext().lookup(NodeRefResSumMinuteAnalysis.Role.INSTANCE).tell(refResRecord);
protected void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) {
try {
getSelfContext().lookup(NodeRefResSumMinuteAnalysis.Role.INSTANCE).tell(refResRecord);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
@Override
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,14 +20,14 @@ public class NodeRefResSumDayAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumDayAnalysis.class);
NodeRefResSumDayAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
} else {
logger.error("unhandled message, message instance must NodeRefResRecord, but is %s", message.getClass().toString());
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,14 +20,14 @@ public class NodeRefResSumHourAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumHourAnalysis.class);
NodeRefResSumHourAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
} else {
logger.error("unhandled message, message instance must NodeRefResRecord, but is %s", message.getClass().toString());
......
......@@ -2,7 +2,11 @@ package org.skywalking.apm.collector.worker.noderef.analysis;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,14 +20,14 @@ public class NodeRefResSumMinuteAnalysis extends AbstractNodeRefResSumAnalysis {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAnalysis.class);
NodeRefResSumMinuteAnalysis(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
} else {
logger.error("unhandled message, message instance must NodeRefResRecord, but is %s", message.getClass().toString());
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -26,7 +31,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeRefDaySave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAgg.class);
NodeRefHourAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeRefHourSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefMinuteAgg.class);
NodeRefMinuteAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeRefMinuteAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeRefMinuteSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumDayAgg.class);
NodeRefResSumDayAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeRefResSumDayAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof MetricData) {
getSelfContext().lookup(NodeRefResSumDaySave.Role.INSTANCE).tell(message);
} else {
......
......@@ -12,7 +12,12 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.TimeSlice;
......@@ -31,9 +36,9 @@ public class NodeRefResSumGroupWithTimeSlice extends AbstractLocalSyncWorker {
}
@Override
public void onWork(Object request, Object response) throws Exception {
public void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
......@@ -77,7 +82,7 @@ public class NodeRefResSumGroupWithTimeSlice extends AbstractLocalSyncWorker {
nodeRefResSumArray.add(nodeRefResSumObj);
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeRefResSumArray);
} else {
logger.error("unhandled message, message instance must NodeRefResSumGroupWithTimeSlice.RequestEntity, but is %s", request.getClass().toString());
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumHourAgg.class);
NodeRefResSumHourAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeRefResSumHourAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof MetricData) {
getSelfContext().lookup(NodeRefResSumHourSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.noderef.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractClusterWorker;
import org.skywalking.apm.collector.actor.AbstractClusterWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.HashCodeSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.config.WorkerConfig;
......@@ -16,7 +21,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefResSumMinuteAgg.class);
NodeRefResSumMinuteAgg(org.skywalking.apm.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -26,7 +31,7 @@ public class NodeRefResSumMinuteAgg extends AbstractClusterWorker {
}
@Override
protected void onWork(Object message) throws Exception {
protected void onWork(Object message) throws WorkerException {
if (message instanceof MetricData) {
getSelfContext().lookup(NodeRefResSumMinuteSave.Role.INSTANCE).tell(message);
} else {
......
......@@ -12,7 +12,12 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
......@@ -32,9 +37,9 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
@Override
public void onWork(Object request, Object response) throws Exception {
public void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefResSumIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
......@@ -84,7 +89,7 @@ public class NodeRefResSumSearchWithTimeSlice extends AbstractLocalSyncWorker {
nodeRefResSumArray.add(nodeRefResSumObj);
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeRefResSumArray);
} else {
logger.error("unhandled message, message instance must NodeRefResSumSearchWithTimeSlice.RequestEntity, but is %s", request.getClass().toString());
......
......@@ -10,7 +10,12 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
......@@ -30,9 +35,9 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
@Override
public void onWork(Object request, Object response) throws Exception {
public void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeRefIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
......@@ -60,7 +65,7 @@ public class NodeRefSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
logger.debug("node ref data: %s", nodeRefArray.toString());
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeRefArray);
} else {
logger.error("unhandled message, message instance must NodeRefSearchWithTimeSlice.RequestEntity, but is %s", request.getClass().toString());
......
......@@ -2,17 +2,21 @@ package org.skywalking.apm.collector.worker.segment;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.analysis.GlobalTraceAnalysis;
import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPost;
import org.skywalking.apm.collector.worker.httpserver.AbstractStreamPostProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.node.analysis.NodeCompAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingDayAnalysis;
import org.skywalking.apm.collector.worker.node.analysis.NodeMappingHourAnalysis;
......@@ -62,42 +66,43 @@ public class SegmentPost extends AbstractStreamPost {
/**
* Read segment's buffer from buffer reader by stream mode. when finish read one segment then send to analysis.
* This method in there, so post servlet just can receive segments data.
*
* @param bufferedReader
* @param response
* @throws Exception
*/
@Override protected void onReceive(BufferedReader bufferedReader, JsonObject response) throws Exception {
@Override protected void onReceive(BufferedReader bufferedReader,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
Segment segment;
do {
int character;
StringBuilder builder = new StringBuilder();
while ((character = bufferedReader.read()) != ' ') {
if (character == -1) {
return;
try {
do {
int character;
StringBuilder builder = new StringBuilder();
while ((character = bufferedReader.read()) != ' ') {
if (character == -1) {
return;
}
builder.append((char)character);
}
builder.append((char)character);
}
int length = Integer.valueOf(builder.toString());
builder = new StringBuilder();
int length = Integer.valueOf(builder.toString());
builder = new StringBuilder();
char[] buffer = new char[length];
int readLength = bufferedReader.read(buffer, 0, length);
if (readLength != length) {
logger.error("The actual data length was different from the length in data head! ");
return;
}
builder.append(buffer);
char[] buffer = new char[length];
int readLength = bufferedReader.read(buffer, 0, length);
if (readLength != length) {
logger.error("The actual data length was different from the length in data head! ");
return;
}
builder.append(buffer);
String segmentJsonStr = builder.toString();
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
tellWorkers(new SegmentAndJson(segment, segmentJsonStr));
String segmentJsonStr = builder.toString();
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentJsonStr);
tellWorkers(new SegmentAndJson(segment, segmentJsonStr));
}
while (segment != null);
} catch (IOException e) {
throw new ArgumentsParseException(e.getMessage(), e);
}
while (segment != null);
}
private void tellWorkers(SegmentAndJson segmentAndJson) throws Exception {
private void tellWorkers(SegmentAndJson segmentAndJson) throws WorkerNotFoundException, WorkerInvokeException {
Segment segment = segmentAndJson.getSegment();
try {
validateData(segment);
......@@ -126,13 +131,15 @@ public class SegmentPost extends AbstractStreamPost {
tellNodeMapping(segmentWithTimeSlice);
}
private void tellNodeRef(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
private void tellNodeRef(
SegmentWithTimeSlice segmentWithTimeSlice) throws WorkerNotFoundException, WorkerInvokeException {
getSelfContext().lookup(NodeRefMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeRefHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeRefDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
}
private void tellNodeMapping(SegmentWithTimeSlice segmentWithTimeSlice) throws Exception {
private void tellNodeMapping(
SegmentWithTimeSlice segmentWithTimeSlice) throws WorkerNotFoundException, WorkerInvokeException {
getSelfContext().lookup(NodeMappingMinuteAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeMappingHourAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
getSelfContext().lookup(NodeMappingDayAnalysis.Role.INSTANCE).tell(segmentWithTimeSlice);
......
......@@ -9,10 +9,13 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
......@@ -32,25 +35,29 @@ public class SegmentTopGetWithGlobalTraceId extends AbstractGet {
getClusterContext().findProvider(SegmentTopSearchWithGlobalTraceId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("globalTraceId") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new IllegalArgumentException("the request parameter must contains globalTraceId, from, limit");
throw new ArgumentsParseException("the request parameter must contains globalTraceId, from, limit");
}
if (logger.isDebugEnabled()) {
logger.debug("globalTraceId: %s, from: %s, limit: %s", Arrays.toString(parameter.get("globalTraceId")),
Arrays.toString(parameter.get("from")), Arrays.toString(parameter.get("limit")));
}
logger.debug("globalTraceId: %s, from: %s, limit: %s", Arrays.toString(parameter.get("globalTraceId")),
Arrays.toString(parameter.get("from")), Arrays.toString(parameter.get("limit")));
int from;
try {
from = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "from"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int limit;
try {
limit = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "limit"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
throw new ArgumentsParseException("the request parameter limit must be an integer");
}
String globalTraceId = ParameterTools.INSTANCE.toString(parameter, "globalTraceId");
......
......@@ -9,10 +9,13 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
......@@ -32,39 +35,43 @@ public class SegmentTopGetWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(SegmentTopSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("from") || !parameter.containsKey("limit")) {
throw new IllegalArgumentException("the request parameter must contains startTime, endTime, from, limit");
throw new ArgumentsParseException("the request parameter must contains startTime, endTime, from, limit");
}
if (logger.isDebugEnabled()) {
logger.debug("startTime: %s, endTime: %s, from: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("from")));
}
logger.debug("startTime: %s, endTime: %s, from: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("from")));
long startTime;
try {
startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
throw new ArgumentsParseException("the request parameter startTime must be a long");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
throw new ArgumentsParseException("the request parameter endTime must be a long");
}
int from;
try {
from = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "from"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int limit;
try {
limit = Integer.valueOf(ParameterTools.INSTANCE.toString(parameter, "limit"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter from must numeric with int type");
throw new ArgumentsParseException("the request parameter from must be an integer");
}
int minCost = -1;
......
......@@ -6,6 +6,8 @@ import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
......@@ -31,10 +33,16 @@ public class SegmentAnalysis extends RecordAnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson)message;
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndJson);
try {
getSelfContext().lookup(SegmentSave.Role.INSTANCE).tell(segmentAndJson);
} catch (WorkerInvokeException | WorkerNotFoundException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}
} else {
logger.error("unhandled message, message instance must Segment, but is %s", message.getClass().toString());
}
......
......@@ -3,7 +3,12 @@ package org.skywalking.apm.collector.worker.segment.analysis;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
......@@ -32,9 +37,9 @@ public class SegmentCostAnalysis extends RecordAnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
......
......@@ -2,9 +2,15 @@ package org.skywalking.apm.collector.worker.segment.analysis;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.RecordAnalysisMember;
......@@ -18,8 +24,6 @@ import org.skywalking.apm.collector.worker.segment.entity.tag.Tags;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionSave;
import org.skywalking.apm.collector.worker.tools.CollectionTools;
import java.util.List;
/**
* @author pengys5
*/
......@@ -37,9 +41,9 @@ public class SegmentExceptionAnalysis extends RecordAnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
Segment segment = segmentWithTimeSlice.getSegment();
if (CollectionTools.isNotEmpty(segment.getSpans())) {
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* The <code>SegmentDeserialize</code> provides single segment json string deserialize and segment array file
......@@ -30,25 +26,4 @@ public enum SegmentDeserialize {
Segment segment = gson.fromJson(singleSegmentJsonStr, Segment.class);
return segment;
}
/**
* Read a json array file contains multiple segments.
*
* @param segmentJsonFile a segments json array file path
* @return on {@link List<Segment>}
* @throws Exception if json data illegal or file broken.
*/
public List<Segment> deserializeMultiple(String segmentJsonFile) throws Exception {
List<Segment> segmentList = new ArrayList<>();
streamReader(segmentList, new FileReader(segmentJsonFile));
return segmentList;
}
private void streamReader(List<Segment> segmentList, FileReader fileReader) throws Exception {
JsonArray segmentArray = gson.fromJson(fileReader, JsonArray.class);
for (int i = 0; i < segmentArray.size(); i++) {
Segment segment = gson.fromJson(segmentArray.get(i), Segment.class);
segmentList.add(segment);
}
}
}
......@@ -2,7 +2,12 @@ package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.JsonObject;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.segment.SegmentExceptionIndex;
......@@ -18,17 +23,17 @@ public class SegmentExceptionWithSegId extends AbstractLocalSyncWorker {
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
GetResponse getResponse = EsClient.INSTANCE.getClient().prepareGet(SegmentExceptionIndex.INDEX, SegmentExceptionIndex.TYPE_RECORD, search.segId).get();
JsonObject dataJson = new JsonObject();
dataJson.addProperty(SegmentExceptionIndex.SEG_ID, (String) getResponse.getSource().get(SegmentExceptionIndex.SEG_ID));
dataJson.addProperty(SegmentExceptionIndex.IS_ERROR, (Boolean) getResponse.getSource().get(SegmentExceptionIndex.IS_ERROR));
dataJson.addProperty(SegmentExceptionIndex.SEG_ID, (String)getResponse.getSource().get(SegmentExceptionIndex.SEG_ID));
dataJson.addProperty(SegmentExceptionIndex.IS_ERROR, (Boolean)getResponse.getSource().get(SegmentExceptionIndex.IS_ERROR));
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", dataJson);
}
}
......
......@@ -45,7 +45,7 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
return new SegmentPersistenceData();
}
@Override final public void analyse(Object message) throws Exception {
@Override final public void analyse(Object message) {
if (message instanceof SegmentAndJson) {
SegmentAndJson segmentAndJson = (SegmentAndJson)message;
SegmentPersistenceData data = getPersistenceData();
......
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
......@@ -12,6 +13,7 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.globaltrace.GlobalTraceIndex;
......@@ -42,7 +44,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
Client client = EsClient.INSTANCE.getClient();
......@@ -84,7 +86,13 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
topSegmentJson.addProperty(SegmentCostIndex.COST, (Number)getResponse.getSource().get(SegmentCostIndex.COST));
String segmentSource = client.prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
Segment segment = null;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
JsonArray distributedTraceIdArray = new JsonArray();
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.worker.segment.persistence;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
......@@ -17,6 +18,7 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.segment.SegmentCostIndex;
......@@ -43,7 +45,7 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity)request;
......@@ -92,7 +94,12 @@ public class SegmentTopSearchWithTimeSlice extends AbstractLocalSyncWorker {
String segmentSource = EsClient.INSTANCE.getClient().prepareGet(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, segId).get().getSourceAsString();
logger().debug("segmentSource:" + segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
Segment segment;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(segmentSource);
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<String> distributedTraceIdList = segment.getRelatedGlobalTraces().get();
JsonArray distributedTraceIdArray = new JsonArray();
......
......@@ -9,10 +9,13 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.span.persistence.SpanSearchWithId;
import org.skywalking.apm.collector.worker.tools.ParameterTools;
......@@ -32,11 +35,15 @@ public class SpanGetWithId extends AbstractGet {
getClusterContext().findProvider(SpanSearchWithId.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("segId") || !parameter.containsKey("spanId")) {
throw new IllegalArgumentException("the request parameter must contains segId, spanId");
throw new ArgumentsParseException("the request parameter must contains segId, spanId");
}
if (logger.isDebugEnabled()) {
logger.debug("segId: %s, spanId: %s", Arrays.toString(parameter.get("segId")), Arrays.toString(parameter.get("spanId")));
}
logger.debug("segId: %s, spanId: %s", Arrays.toString(parameter.get("segId")), Arrays.toString(parameter.get("spanId")));
String segId = ParameterTools.INSTANCE.toString(parameter, "segId");
String spanId = ParameterTools.INSTANCE.toString(parameter, "spanId");
......
......@@ -2,8 +2,15 @@ package org.skywalking.apm.collector.worker.span.persistence;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.get.GetResponse;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorkerProvider;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
......@@ -13,8 +20,6 @@ import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.Span;
import org.skywalking.apm.collector.worker.storage.GetResponseFromEs;
import java.util.List;
/**
* @author pengys5
*/
......@@ -27,11 +32,16 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
}
@Override
protected void onWork(Object request, Object response) throws Exception {
protected void onWork(Object request, Object response) throws WorkerException {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
GetResponse getResponse = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, search.segId);
Segment segment = SegmentDeserialize.INSTANCE.deserializeSingle(getResponse.getSourceAsString());
Segment segment;
try {
segment = SegmentDeserialize.INSTANCE.deserializeSingle(getResponse.getSourceAsString());
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);
}
List<Span> spanList = segment.getSpans();
getResponse.getSource();
......@@ -44,7 +54,7 @@ public class SpanSearchWithId extends AbstractLocalSyncWorker {
}
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add(Const.RESULT, dataJson);
}
}
......
......@@ -9,11 +9,14 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.httpserver.AbstractGet;
import org.skywalking.apm.collector.worker.httpserver.AbstractGetProvider;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.node.persistence.NodeCompLoad;
import org.skywalking.apm.collector.worker.node.persistence.NodeMappingSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice;
......@@ -39,25 +42,29 @@ public class TraceDagGetWithTimeSlice extends AbstractGet {
getClusterContext().findProvider(NodeRefResSumSearchWithTimeSlice.WorkerRole.INSTANCE).create(this);
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
if (!parameter.containsKey("startTime") || !parameter.containsKey("endTime") || !parameter.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
throw new ArgumentsParseException("the request parameter must contains startTime,endTime,timeSliceType");
}
if (logger.isDebugEnabled()) {
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
}
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(parameter.get("startTime")),
Arrays.toString(parameter.get("endTime")), Arrays.toString(parameter.get("timeSliceType")));
long startTime;
try {
startTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "startTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter startTime must numeric with long type");
throw new ArgumentsParseException("the request parameter startTime must be a long");
}
long endTime;
try {
endTime = Long.valueOf(ParameterTools.INSTANCE.toString(parameter, "endTime"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter endTime must numeric with long type");
throw new ArgumentsParseException("the request parameter endTime must be a long");
}
String timeSliceType = ParameterTools.INSTANCE.toString(parameter, "timeSliceType");
......
package org.skywalking.apm.collector.worker;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
......@@ -13,14 +10,17 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.anyObject;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(TestAnalysisMember.class)
@PowerMockIgnore( {"javax.management.*"})
@PowerMockIgnore({"javax.management.*"})
public class AnalysisMemberTestCase {
@Test
......@@ -54,40 +54,4 @@ public class AnalysisMemberTestCase {
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
member.preStart();
}
@Test
public void testOnWorkException() throws Exception {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
TestAnalysisMember member = PowerMockito.spy(new TestAnalysisMember(TestAnalysisMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
doThrow(new TestException()).when(member).analyse(anyObject());
ExceptionAnswer answer = new ExceptionAnswer();
PowerMockito.when(member, "saveException", any(TestException.class)).thenAnswer(answer);
member.onWork(new Object());
Assert.assertEquals(true, answer.isTestException);
}
class TestException extends Exception {
}
class ExceptionAnswer implements Answer {
boolean isTestException = false;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object obj = invocation.getArguments()[0];
if (obj instanceof TestException) {
isTestException = true;
} else {
isTestException = false;
}
return null;
}
}
}
......@@ -14,7 +14,7 @@ public class TestAnalysisMember extends AnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
}
......@@ -24,7 +24,7 @@ public class TestAnalysisMember extends AnalysisMember {
}
@Override
protected void aggregation() throws Exception {
protected void aggregation() {
}
......
......@@ -16,7 +16,7 @@ public class TestJoinAndSplitAnalysisMember extends JoinAndSplitAnalysisMember {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
}
......
......@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
......@@ -44,7 +45,7 @@ public class AbstractGetTestCase {
verify(get).onReceive(any(Map.class), any(JsonObject.class));
}
@Test
@Test(expected = WorkerInvokeException.class)
public void testOnWorkError() throws Exception {
Map<String, String[]> parameterMap = new HashMap<>();
......@@ -57,7 +58,7 @@ public class AbstractGetTestCase {
}
}).when(writer).print(any(JsonObject.class));
doThrow(new Exception("testOnWorkError")).when(get).onReceive(any(Map.class), any(JsonObject.class));
doThrow(new WorkerInvokeException("testOnWorkError")).when(get).onReceive(any(Map.class), any(JsonObject.class));
get.onWork(parameterMap, response);
}
}
......@@ -11,6 +11,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import static org.mockito.Matchers.anyInt;
......@@ -82,7 +83,7 @@ public class PostWithHttpServletTestCase {
return null;
}
}).when(response).setStatus(anyInt());
doThrow(new Exception()).when(workerRef).tell(anyString());
doThrow(new WorkerInvokeException("")).when(workerRef).tell(anyString());
servlet.doPost(request, response);
}
}
......@@ -2,11 +2,12 @@ package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
......@@ -23,7 +24,8 @@ public class TestAbstractGet extends AbstractGet {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
......@@ -6,6 +6,8 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
......@@ -22,7 +24,8 @@ public class TestAbstractPost extends AbstractPost {
super.preStart();
}
@Override protected void onReceive(Map<String, String[]> parameter, JsonObject response) throws Exception {
@Override protected void onReceive(Map<String, String[]> parameter,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
......
package org.skywalking.apm.collector.worker.httpserver;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerNotFoundException;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestAbstractStreamPost extends AbstractStreamPost {
public TestAbstractStreamPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onReceive(BufferedReader reader,
JsonObject response) throws ArgumentsParseException, WorkerInvokeException, WorkerNotFoundException {
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestAbstractStreamPost.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
public static class Factory extends AbstractStreamPostProvider<TestAbstractStreamPost> {
@Override
public String servletPath() {
return "/TestAbstractPost";
}
@Override
public Role role() {
return TestAbstractStreamPost.WorkerRole.INSTANCE;
}
@Override
public TestAbstractStreamPost workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractStreamPost(role(), clusterContext, new LocalWorkerContext());
}
}
}
package org.skywalking.apm.collector.worker.httpserver;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.Role;
/**
* @author pengys5
*/
public class TestAbstractStreamPostProvider extends AbstractStreamPostProvider {
@Override
public String servletPath() {
return "testPost";
}
@Override
public Role role() {
return null;
}
@Override
public AbstractWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestAbstractStreamPost(TestAbstractStreamPost.WorkerRole.INSTANCE, null, null);
}
}
......@@ -21,6 +21,7 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumGroupWithTimeSlice;
import static org.mockito.Mockito.doAnswer;
......@@ -94,14 +95,14 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnReceiveError() throws Exception {
Map<String, String[]> request = new HashMap<>();
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnReceiveStartTimeError() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] startTime = {"x"};
......@@ -115,7 +116,7 @@ public class NodeRefResSumGetGroupWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnReceiveEndTimeError() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] startTime = {"100"};
......
package org.skywalking.apm.collector.worker.noderef.analysis;
import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
......@@ -10,8 +11,6 @@ import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.storage.MetricAnalysisData;
import java.lang.reflect.Field;
/**
* @author pengys5
*/
......@@ -56,7 +55,7 @@ public class AbstractNodeRefResSumAnalysisTestCase {
Field testAField = impl.getClass().getSuperclass().getSuperclass().getDeclaredField("metricAnalysisData");
testAField.setAccessible(true);
MetricAnalysisData metricAnalysisData = (MetricAnalysisData) testAField.get(impl);
MetricAnalysisData metricAnalysisData = (MetricAnalysisData)testAField.get(impl);
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("oneSecondLess"));
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("threeSecondLess"));
......@@ -72,7 +71,7 @@ public class AbstractNodeRefResSumAnalysisTestCase {
}
@Override
public void analyse(Object message) throws Exception {
public void analyse(Object message) {
}
......
......@@ -18,6 +18,7 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionWithSegId;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithGlobalTraceId;
......@@ -100,14 +101,14 @@ public class SegmentTopGetWithGlobalTraceIdTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchError() throws Exception {
Map<String, String[]> request = new HashMap<>();
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorFrom() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] globalTraceId = {"TestId"};
......@@ -121,7 +122,7 @@ public class SegmentTopGetWithGlobalTraceIdTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorLimit() throws Exception {
Map<String, String[]> request = new HashMap<>();
String[] globalTraceId = {"TestId"};
......
......@@ -18,6 +18,7 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentExceptionWithSegId;
import org.skywalking.apm.collector.worker.segment.persistence.SegmentTopSearchWithTimeSlice;
......@@ -93,7 +94,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorStartTime() throws Exception {
Map<String, String[]> request = createRequest();
String[] startTime = {"x"};
......@@ -103,7 +104,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorEndTime() throws Exception {
Map<String, String[]> request = createRequest();
String[] endTime = {"x"};
......@@ -113,7 +114,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorFrom() throws Exception {
Map<String, String[]> request = createRequest();
String[] from = {"x"};
......@@ -123,7 +124,7 @@ public class SegmentTopGetWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorLimit() throws Exception {
Map<String, String[]> request = createRequest();
String[] limit = {"x"};
......
package org.skywalking.apm.collector.worker.segment.entity;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public enum SegmentDeserializeFromFile {
INSTANCE;
private final Gson gson = new Gson();
/**
* Read a json array file contains multiple segments.
*
* @param segmentJsonFile a segments json array file path
* @return on {@link List <Segment>}
* @throws Exception if json data illegal or file broken.
*/
public List<Segment> deserializeMultiple(String segmentJsonFile) throws Exception {
List<Segment> segmentList = new ArrayList<>();
streamReader(segmentList, new FileReader(segmentJsonFile));
return segmentList;
}
private void streamReader(List<Segment> segmentList, FileReader fileReader) throws Exception {
JsonArray segmentArray = gson.fromJson(fileReader, JsonArray.class);
for (int i = 0; i < segmentArray.size(); i++) {
Segment segment = gson.fromJson(segmentArray.get(i), Segment.class);
segmentList.add(segment);
}
}
}
package org.skywalking.apm.collector.worker.segment.mock;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.worker.AnalysisMember;
import org.skywalking.apm.collector.worker.segment.SegmentPost;
import org.skywalking.apm.collector.worker.segment.entity.Segment;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserialize;
import org.skywalking.apm.collector.worker.segment.entity.SegmentDeserializeFromFile;
import org.skywalking.apm.collector.worker.tools.DateTools;
import org.skywalking.apm.collector.worker.tools.JsonFileReader;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
......@@ -67,8 +66,9 @@ public class SegmentMock {
return createSegmentWithTimeSliceList(PortalServiceJsonFile);
}
private List<SegmentPost.SegmentWithTimeSlice> createSegmentWithTimeSliceList(String jsonFilePath) throws Exception {
List<Segment> segmentList = SegmentDeserialize.INSTANCE.deserializeMultiple(jsonFilePath);
private List<SegmentPost.SegmentWithTimeSlice> createSegmentWithTimeSliceList(
String jsonFilePath) throws Exception {
List<Segment> segmentList = SegmentDeserializeFromFile.INSTANCE.deserializeMultiple(jsonFilePath);
List<SegmentPost.SegmentWithTimeSlice> segmentWithTimeSliceList = new ArrayList<>();
for (Segment segment : segmentList) {
......
package org.skywalking.apm.collector.worker.span;
import com.google.gson.JsonObject;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -18,20 +21,20 @@ import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRefs;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.span.persistence.SpanSearchWithId;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest( {ClusterWorkerContext.class})
@PowerMockIgnore( {"javax.management.*"})
@PrepareForTest({ClusterWorkerContext.class})
@PowerMockIgnore({"javax.management.*"})
public class SpanGetWithIdTestCase {
private SpanGetWithId getObj;
......@@ -89,7 +92,7 @@ public class SpanGetWithIdTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchError() throws Exception {
Map<String, String[]> request = new HashMap<>();
JsonObject response = new JsonObject();
......@@ -100,7 +103,7 @@ public class SpanGetWithIdTestCase {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
SpanSearchWithId.RequestEntity requestEntity = (SpanSearchWithId.RequestEntity) invocation.getArguments()[0];
SpanSearchWithId.RequestEntity requestEntity = (SpanSearchWithId.RequestEntity)invocation.getArguments()[0];
Assert.assertEquals("10", requestEntity.getSegId());
Assert.assertEquals("20", requestEntity.getSpanId());
return null;
......
......@@ -17,6 +17,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.worker.Const;
import org.skywalking.apm.collector.worker.httpserver.ArgumentsParseException;
import org.skywalking.apm.collector.worker.node.persistence.NodeCompLoad;
import org.skywalking.apm.collector.worker.node.persistence.NodeMappingSearchWithTimeSlice;
import org.skywalking.apm.collector.worker.noderef.persistence.NodeRefResSumSearchWithTimeSlice;
......@@ -101,14 +102,14 @@ public class TraceDagGetWithTimeSliceTestCase {
Assert.assertEquals("NodeRefResSumSearchWithTimeSlice", argumentCaptor.getAllValues().get(3).roleName());
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchError() throws Exception {
Map<String, String[]> request = new HashMap<>();
JsonObject response = new JsonObject();
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorStartTime() throws Exception {
Map<String, String[]> request = createRequest();
String[] startTime = {"xx"};
......@@ -118,7 +119,7 @@ public class TraceDagGetWithTimeSliceTestCase {
getObj.onReceive(request, response);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = ArgumentsParseException.class)
public void testOnSearchErrorEndTime() throws Exception {
Map<String, String[]> request = createRequest();
String[] endTime = {"xx"};
......
org.skywalking.apm.collector.worker.httpserver.TestAbstractStreamPost$Factory
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册