提交 3169f827 编写于 作者: P peng-yongsheng

mix worker model with graph model.

上级 288ced67
......@@ -18,23 +18,18 @@
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
/**
* @author peng-yongsheng
......@@ -54,23 +49,7 @@ public class JvmMetricStreamGraph {
}
public Graph<CpuMetric> createCPUMetricGraph() throws ProviderNotFoundException {
CpuMetricPersistenceWorker.Factory factory = new CpuMetricPersistenceWorker.Factory(null, null);
final WorkerRef workerRef = factory.create(null);
Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new NodeProcessor<CpuMetric, CpuMetric>() {
@Override public int id() {
return 0;
}
@Override public void process(CpuMetric INPUT, Next<CpuMetric> next) {
try {
workerRef.tell(INPUT);
} catch (WorkerInvokeException e) {
e.printStackTrace();
}
}
});
return graph;
}
......
......@@ -22,16 +22,21 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker {
public class CpuMetricPersistenceWorker extends PersistenceWorker<CpuMetric, CpuMetric> {
private final DAOService daoService;
@Override public int id() {
return 0;
}
public CpuMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
......
......@@ -21,7 +21,7 @@ package org.skywalking.apm.collector.core.data;
/**
* @author peng-yongsheng
*/
public abstract class Data extends AbstractHashMessage {
public abstract class Data extends EndOfBatchQueueMessage {
private String[] dataStrings;
private Long[] dataLongs;
private Double[] dataDoubles;
......
......@@ -16,20 +16,25 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
package org.skywalking.apm.collector.core.data;
/**
* @author peng-yongsheng
*/
public interface Context extends LookUp {
public abstract class EndOfBatchQueueMessage extends AbstractHashMessage {
void putProvider(AbstractRemoteWorkerProvider provider);
private boolean endOfBatch;
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
public EndOfBatchQueueMessage(String key) {
super(key);
endOfBatch = false;
}
RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException;
public final boolean isEndOfBatch() {
return endOfBatch;
}
void put(WorkerRef workerRef);
void remove(WorkerRef workerRef);
public final void setEndOfBatch(boolean endOfBatch) {
this.endOfBatch = endOfBatch;
}
}
......@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author peng-yongsheng
*/
public interface Executor<Input> {
void execute(Input input);
public interface Executor<INPUT> {
void execute(INPUT input) throws CollectorException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public class EndOfBatchCommand {
}
......@@ -18,17 +18,19 @@
package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class MessageHolder {
private Object message;
public class MessageHolder<MESSAGE extends Data> {
private MESSAGE message;
public Object getMessage() {
public MESSAGE getMessage() {
return message;
}
public void setMessage(Object message) {
public void setMessage(MESSAGE message) {
this.message = message;
}
......
......@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public interface QueueEventHandler {
void tell(Object message);
public interface QueueEventHandler<MESSAGE extends Data> {
void tell(MESSAGE message);
}
......@@ -18,10 +18,11 @@
package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.framework.Executor;
/**
* @author peng-yongsheng
*/
public interface QueueExecutor extends Executor {
public interface QueueExecutor<MESSAGE extends Data> extends Executor<MESSAGE> {
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.queue.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
......@@ -25,6 +26,6 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
/**
* @author peng-yongsheng
*/
public interface QueueCreatorService extends Service {
QueueEventHandler create(int queueSize, QueueExecutor executor);
public interface QueueCreatorService<MESSAGE extends Data> extends Service {
QueueEventHandler<MESSAGE> create(int queueSize, QueueExecutor<MESSAGE> executor);
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.MessageHolder;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
......@@ -30,14 +31,14 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DisruptorEventHandler implements EventHandler<MessageHolder>, QueueEventHandler {
public class DisruptorEventHandler<MESSAGE extends Data> implements EventHandler<MessageHolder<MESSAGE>>, QueueEventHandler<MESSAGE> {
private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class);
private RingBuffer<MessageHolder> ringBuffer;
private QueueExecutor executor;
private RingBuffer<MessageHolder<MESSAGE>> ringBuffer;
private QueueExecutor<MESSAGE> executor;
DisruptorEventHandler(RingBuffer<MessageHolder> ringBuffer, QueueExecutor executor) {
DisruptorEventHandler(RingBuffer<MessageHolder<MESSAGE>> ringBuffer, QueueExecutor<MESSAGE> executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
......@@ -50,14 +51,12 @@ public class DisruptorEventHandler implements EventHandler<MessageHolder>, Queue
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
Object message = event.getMessage();
public void onEvent(MessageHolder<MESSAGE> event, long sequence, boolean endOfBatch) throws CollectorException {
MESSAGE message = event.getMessage();
event.reset();
message.setEndOfBatch(endOfBatch);
executor.execute(message);
if (endOfBatch) {
executor.execute(new EndOfBatchCommand());
}
}
/**
......@@ -65,7 +64,7 @@ public class DisruptorEventHandler implements EventHandler<MessageHolder>, Queue
*
* @param message of the data to process.
*/
public void tell(Object message) {
public void tell(MESSAGE message) {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
......
......@@ -18,10 +18,11 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface ICpuMetricStreamDAO<Insert, Update, Data> extends IPersistenceDAO<Insert, Update, Data> {
public interface ICpuMetricStreamDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
......@@ -20,7 +20,6 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
......@@ -33,7 +32,8 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class CpuMetricH2StreamDAO extends H2DAO implements ICpuMetricStreamDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, CpuMetric> {
public class CpuMetricH2StreamDAO extends H2DAO implements ICpuMetricStreamDAO<H2SqlEntity, H2SqlEntity, CpuMetric> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricH2StreamDAO.class);
@Override public CpuMetric get(String id) {
......
......@@ -39,7 +39,9 @@ import org.slf4j.LoggerFactory;
* @author peng-yongsheng, clevertension
*/
public class InstanceH2UIDAO extends H2DAO implements IInstanceUIDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2UIDAO.class);
private static final String GET_LAST_HEARTBEAT_TIME_SQL = "select {0} from {1} where {2} > ? limit 1";
private static final String GET_INST_LAST_HEARTBEAT_TIME_SQL = "select {0} from {1} where {2} > ? and {3} = ? limit 1";
private static final String GET_INSTANCE_SQL = "select * from {0} where {1} = ?";
......
......@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
import org.slf4j.Logger;
......@@ -51,11 +50,11 @@ public class PersistenceTimer {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> {
logger.debug("extract {} worker data and save", worker.getRole().roleName());
logger.debug("extract {} worker data and save", worker.getClass().getName());
try {
worker.allocateJob(new FlushAndSwitch());
worker.flushAndSwitch();
List<?> batchCollection = worker.buildBatchCollection();
logger.debug("extract {} worker data size: {}", worker.getRole().roleName(), batchCollection.size());
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
batchAllCollection.addAll(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
......
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
/**
......@@ -27,15 +28,19 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor {
public abstract class AbstractLocalAsyncWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractWorker<INPUT, OUTPUT> implements QueueExecutor<INPUT> {
/**
* Receive message
*
* @param message The persistence data or metric data.
* @throws WorkerException The Exception happen in {@link #onWork(Object)}
* @throws WorkerException The Exception happen in {@link #onWork(INPUT)}
*/
final public void allocateJob(Object message) throws WorkerException {
final public void allocateJob(INPUT message) throws WorkerException {
onWork(message);
}
@Override public final void execute(INPUT message) throws WorkerException {
onWork(message);
}
}
......@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
......@@ -26,23 +27,23 @@ import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker & QueueExecutor> extends AbstractWorkerProvider<T> {
public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractLocalAsyncWorker<INPUT, OUTPUT> & QueueExecutor<INPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WorkerType> {
public abstract int queueSize();
private final DAOService daoService;
private final QueueCreatorService queueCreatorService;
private final QueueCreatorService<INPUT> queueCreatorService;
public AbstractLocalAsyncWorkerProvider(DAOService daoService, QueueCreatorService queueCreatorService) {
public AbstractLocalAsyncWorkerProvider(DAOService daoService, QueueCreatorService<INPUT> queueCreatorService) {
this.daoService = daoService;
this.queueCreatorService = queueCreatorService;
}
@Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
T localAsyncWorker = workerInstance(daoService);
WorkerType localAsyncWorker = workerInstance(daoService);
workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
return new LocalAsyncWorkerRef(queueEventHandler);
QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler);
}
}
......@@ -18,6 +18,8 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
/**
* The <code>AbstractRemoteWorker</code> implementations represent workers,
* which receive remote messages.
......@@ -27,40 +29,18 @@ package org.skywalking.apm.collector.stream.worker.base;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorker extends AbstractWorker<RemoteWorkerRef> {
private RemoteWorkerRef workerRef;
/**
* Construct an <code>AbstractRemoteWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
*/
protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
public abstract class AbstractRemoteWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractWorker<INPUT, OUTPUT> {
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
* @throws Exception The Exception happen in {@link #onWork(INPUT)}
*/
final public void allocateJob(Object message) throws WorkerInvokeException {
final public void allocateJob(INPUT message) {
try {
onWork(message);
} catch (WorkerException e) {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
@Override protected final RemoteWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(RemoteWorkerRef workerRef) {
this.workerRef = workerRef;
}
}
......@@ -18,7 +18,9 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
......@@ -28,11 +30,13 @@ import org.skywalking.apm.collector.remote.service.RemoteClientService;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorker> extends AbstractWorkerProvider<T> {
public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractRemoteWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WorkerType> {
private final DAOService daoService;
private final RemoteClientService remoteClientService;
public AbstractRemoteWorkerProvider(RemoteClientService remoteClientService) {
public AbstractRemoteWorkerProvider(DAOService daoService, RemoteClientService remoteClientService) {
this.daoService = daoService;
this.remoteClientService = remoteClientService;
}
......@@ -44,16 +48,14 @@ public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorke
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create(WorkerCreateListener workerCreateListener) {
T remoteWorker = workerInstance(getClusterContext());
WorkerType remoteWorker = workerInstance(daoService);
workerCreateListener.addWorker(remoteWorker);
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), remoteWorker);
getClusterContext().put(workerRef);
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(remoteWorker);
return workerRef;
}
public final RemoteWorkerRef create(String host, int port) {
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), remoteClientService.create(host, port));
getClusterContext().put(workerRef);
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(null, remoteClientService.create(host, port));
return workerRef;
}
}
......@@ -18,30 +18,39 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.framework.Executor;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker implements Executor {
public abstract class AbstractWorker<INPUT extends Data, OUTPUT extends Data> implements NodeProcessor<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private Next<OUTPUT> next;
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws WorkerException Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
protected abstract void onWork(INPUT message) throws WorkerException;
@Override public final void execute(Object message) {
@Override public final void process(INPUT INPUT, Next<OUTPUT> next) {
this.next = next;
try {
onWork(message);
onWork(INPUT);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
}
protected final void onNext(OUTPUT message) {
next.execute(message);
}
}
......@@ -18,11 +18,12 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorkerProvider<T extends AbstractWorker> implements Provider {
public abstract T workerInstance(DAOService daoService);
public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
public abstract WorkerType workerInstance(DAOService daoService);
}
......@@ -18,21 +18,28 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerRef extends WorkerRef {
public class LocalAsyncWorkerRef<INPUT extends Data, OUTPUT extends Data> extends WorkerRef<INPUT, OUTPUT> {
private QueueEventHandler queueEventHandler;
private final QueueEventHandler<INPUT> queueEventHandler;
public LocalAsyncWorkerRef(QueueEventHandler queueEventHandler) {
LocalAsyncWorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler,
QueueEventHandler<INPUT> queueEventHandler) {
super(destinationHandler);
this.queueEventHandler = queueEventHandler;
}
@Override
public void tell(Object message) throws WorkerInvokeException {
queueEventHandler.tell(message);
@Override protected void in(INPUT INPUT) {
queueEventHandler.tell(INPUT);
}
@Override protected void out(INPUT INPUT) {
super.out(INPUT);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
*/
public interface LookUp {
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
}
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -26,33 +27,32 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class RemoteWorkerRef extends WorkerRef {
public class RemoteWorkerRef<INPUT extends Data, OUTPUT extends Data> extends WorkerRef<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final Boolean acrossJVM;
private final AbstractRemoteWorker remoteWorker;
private final AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker;
private final RemoteClient remoteClient;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role);
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker) {
super(remoteWorker);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.remoteClient = null;
}
public RemoteWorkerRef(Role role, RemoteClient remoteClient) {
super(role);
this.remoteWorker = null;
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker, RemoteClient remoteClient) {
super(remoteWorker);
this.remoteWorker = remoteWorker;
this.acrossJVM = true;
this.remoteClient = remoteClient;
}
@Override
public void tell(Object message) throws WorkerInvokeException {
@Override protected void in(INPUT message) {
if (acrossJVM) {
try {
remoteClient.send(getRole().roleName(), (Data)message, getRole().dataDefine().remoteDataMappingId());
GraphManager.INSTANCE.findGraph(1);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......@@ -61,7 +61,11 @@ public class RemoteWorkerRef extends WorkerRef {
}
}
public Boolean isAcrossJVM() {
@Override protected void out(INPUT INPUT) {
super.out(INPUT);
}
private Boolean isAcrossJVM() {
return acrossJVM;
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public enum RemoteWorkerRefCounter {
INSTANCE;
private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
public int incrementAndGet(Role role) {
if (!counter.containsKey(role.roleName())) {
AtomicInteger atomic = new AtomicInteger(0);
counter.putIfAbsent(role.roleName(), atomic);
}
return counter.get(role.roleName()).incrementAndGet();
}
}
......@@ -18,6 +18,8 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.CollectorException;
/**
* Defines a general exception a worker can throw when it
* encounters difficulty.
......@@ -25,7 +27,7 @@ package org.skywalking.apm.collector.stream.worker.base;
* @author peng-yongsheng
* @since v3.1-2017
*/
public class WorkerException extends Exception {
public class WorkerException extends CollectorException {
public WorkerException(String message) {
super(message);
......
......@@ -18,9 +18,15 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.core.graph.WayToNode;
/**
* @author peng-yongsheng
*/
public abstract class WorkerRef {
public abstract void tell(Object message) throws WorkerInvokeException;
public abstract class WorkerRef<INPUT extends Data, OUTPUT extends Data> extends WayToNode<INPUT, OUTPUT> {
WorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
super(destinationHandler);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class WorkerRefs<T extends WorkerRef> {
private final Logger logger = LoggerFactory.getLogger(WorkerRefs.class);
private List<T> workerRefs;
private Role role;
protected WorkerRefs(List<T> workerRefs) {
this.workerRefs = workerRefs;
}
protected WorkerRefs(List<T> workerRefs, Role role) {
this.workerRefs = workerRefs;
this.role = role;
}
public void tell(Object message) throws WorkerInvokeException {
// logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerRefs.forEach(workerRef -> {
if (workerRef instanceof RemoteWorkerRef) {
logger.debug("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString());
}
});
// workerSelector.select(workerRefs, message).tell(message);
}
}
......@@ -19,15 +19,8 @@
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.Role;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.base.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.base.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,38 +28,30 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
public abstract class AggregationWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
private DataCache dataCache;
private int messageNum;
public AggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
public AggregationWorker() {
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(INPUT message) throws WorkerException {
messageNum++;
aggregate(message);
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof EndOfBatchCommand) {
if (messageNum >= 100) {
sendToNext();
messageNum = 0;
}
if (message.isEndOfBatch()) {
sendToNext();
} else {
messageNum++;
aggregate(message);
if (messageNum >= 100) {
sendToNext();
messageNum = 0;
}
}
}
protected abstract WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException;
private void sendToNext() throws WorkerException {
dataCache.switchPointer();
while (dataCache.getLast().isWriting()) {
......@@ -76,24 +61,19 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
throw new WorkerException(e.getMessage(), e);
}
}
dataCache.getLast().asMap().forEach((id, data) -> {
try {
logger.debug(data.toString());
nextWorkRef(id).tell(data);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
dataCache.getLast().asMap().forEach((String id, Data data) -> {
logger.debug(data.toString());
onNext((OUTPUT)data);
});
dataCache.finishReadingLast();
}
protected final void aggregate(Object message) {
Data data = (Data)message;
private void aggregate(INPUT message) {
dataCache.writing();
if (dataCache.containsKey(data.getId())) {
getRole().dataDefine().mergeData(dataCache.get(data.getId()), data);
if (dataCache.containsKey(message.getId())) {
message.mergeData(dataCache.get(message.getId()));
} else {
dataCache.put(data.getId(), data);
dataCache.put(message.getId(), message);
}
dataCache.finishWriting();
}
......
......@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
......@@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
public abstract class PersistenceWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
......@@ -48,32 +47,31 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
this.daoService = daoService;
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
public final void flushAndSwitch() {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} finally {
dataCache.trySwitchPointerFinally();
}
}
@Override protected final void onWork(INPUT message) throws WorkerException {
if (dataCache.currentCollectionSize() >= 5000) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
dao.batchPersistence(collection);
}
} finally {
dataCache.trySwitchPointerFinally();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() >= 5000) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
dao.batchPersistence(collection);
}
} finally {
dataCache.trySwitchPointerFinally();
}
}
aggregate(message);
}
aggregate(message);
}
public final List<?> buildBatchCollection() throws WorkerException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册