提交 e11e4e5a 编写于 作者: P peng-yongsheng

Provide stream module.

上级 fb43c49e
......@@ -113,7 +113,7 @@
<!-- remote provider -->
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-remote-grpc-define</artifactId>
<artifactId>collector-remote-grpc-provider</artifactId>
<version>${project.version}</version>
</dependency>
<!-- remote provider -->
......
......@@ -29,8 +29,8 @@ public class Data extends AbstractHashMessage {
private Boolean[] dataBooleans;
private byte[][] dataBytes;
public Data(String id, int stringCapacity, int longCapacity, int doubleCapacity, int integerCapacity,
int booleanCapacity, int byteCapacity) {
public Data(String id, int stringCapacity, int longCapacity, int doubleCapacity,
int integerCapacity, int booleanCapacity, int byteCapacity) {
super(id);
this.dataStrings = new String[stringCapacity];
this.dataStrings[0] = id;
......
......@@ -58,6 +58,8 @@ public abstract class DataDefine {
attributes[position] = attribute;
}
public abstract int remoteDataMappingId();
protected abstract int initialCapacity();
protected abstract void attributeDefine();
......@@ -66,7 +68,7 @@ public abstract class DataDefine {
return new Data(id, stringCapacity, longCapacity, doubleCapacity, integerCapacity, booleanCapacity, byteCapacity);
}
public void mergeData(Data newData, Data oldData) {
public final void mergeData(Data newData, Data oldData) {
int stringPosition = 0;
int longPosition = 0;
int doublePosition = 0;
......
......@@ -22,5 +22,5 @@ package org.skywalking.apm.collector.remote;
* @author peng-yongsheng
*/
public enum RemoteDataMapping {
InstPerformance, NodeComponent, NodeMapping, NodeReference, Application, Instance, ServiceName, ServiceEntry, ServiceReference
GlobalTrace, Segment, SegmentCost, InstPerformance, NodeComponent, NodeMapping, NodeReference, Application, Instance, ServiceName, ServiceEntry, ServiceReference, CpuMetric, MemoryMetric, MemoryPoolMetric, GCMetric
}
......@@ -19,11 +19,10 @@
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public interface RemoteClient {
void send(String roleName, Data data, RemoteDataMapping mapping);
void send(String roleName, Data data, int remoteDataMappingId);
}
......@@ -19,11 +19,11 @@
package org.skywalking.apm.collector.remote.grpc.service;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
import org.skywalking.apm.collector.remote.RemoteDataMappingContainer;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/**
......@@ -39,8 +39,8 @@ public class GRPCRemoteClient implements RemoteClient {
this.streamObserver = streamObserver;
}
@Override public void send(String roleName, Data data, RemoteDataMapping mapping) {
RemoteData remoteData = (RemoteData)container.get(mapping.ordinal()).serialize(data);
@Override public void send(String roleName, Data data, int remoteDataMappingId) {
RemoteData remoteData = (RemoteData)container.get(remoteDataMappingId).serialize(data);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(roleName);
builder.setRemoteData(remoteData);
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class GlobalTraceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.GlobalTrace.ordinal();
}
@Override protected int initialCapacity() {
return 4;
}
......
......@@ -24,12 +24,17 @@ import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class InstPerformanceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.InstPerformance.ordinal();
}
@Override protected int initialCapacity() {
return 6;
}
......
......@@ -24,12 +24,17 @@ import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class CpuMetricDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.CpuMetric.ordinal();
}
@Override protected int initialCapacity() {
return 4;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class GCMetricDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.GCMetric.ordinal();
}
@Override protected int initialCapacity() {
return 6;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class MemoryMetricDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.MemoryMetric.ordinal();
}
@Override protected int initialCapacity() {
return 8;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.MemoryPoolMetric.ordinal();
}
@Override protected int initialCapacity() {
return 8;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeComponentDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeComponent.ordinal();
}
@Override protected int initialCapacity() {
return 6;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeMappingDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeMapping.ordinal();
}
@Override protected int initialCapacity() {
return 5;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeReferenceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeReference.ordinal();
}
@Override protected int initialCapacity() {
return 11;
}
......
......@@ -18,18 +18,23 @@
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ApplicationDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Application.ordinal();
}
@Override protected int initialCapacity() {
return 3;
}
......
......@@ -18,18 +18,23 @@
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class InstanceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Instance.ordinal();
}
@Override protected int initialCapacity() {
return 7;
}
......
......@@ -18,18 +18,23 @@
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceNameDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceName.ordinal();
}
@Override protected int initialCapacity() {
return 4;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class SegmentCostDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.SegmentCost.ordinal();
}
@Override protected int initialCapacity() {
return 9;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class SegmentDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Segment.ordinal();
}
@Override protected int initialCapacity() {
return 2;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceEntryDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceEntry.ordinal();
}
@Override protected int initialCapacity() {
return 6;
}
......
......@@ -23,12 +23,17 @@ import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceReference.ordinal();
}
@Override protected int initialCapacity() {
return 15;
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class StreamModule extends Module {
public static final String NAME = "stream";
@Override public String name() {
return NAME;
}
@Override public Class[] services() {
return new Class[0];
}
}
#
# Copyright 2017, OpenSkywalking Organization All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModule
\ No newline at end of file
......@@ -30,4 +30,11 @@
<artifactId>collector-stream-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.Properties;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class StreamModuleProvider extends ModuleProvider {
@Override public String name() {
return "worker";
}
@Override public Class<? extends Module> module() {
return StreamModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
try {
QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteClientService remoteClientService = getManager().find(RemoteModule.NAME).getService(RemoteClientService.class);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {RemoteModule.NAME, QueueModule.NAME, StorageModule.NAME};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class PersistenceTimer implements Starter {
private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
public void start() {
logger.info("persistence timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(), 1, timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave() {
try {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>();
workers.forEach((PersistenceWorker worker) -> {
logger.debug("extract {} worker data and save", worker.getRole().roleName());
try {
worker.allocateJob(new FlushAndSwitch());
List<?> batchCollection = worker.buildBatchCollection();
logger.debug("extract {} worker data size: {}", worker.getRole().roleName(), batchCollection.size());
batchAllCollection.addAll(batchCollection);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
});
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(batchAllCollection);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
} finally {
logger.debug("persistence data save finish");
}
}
}
......@@ -16,9 +16,9 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
/**
* The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
......
......@@ -16,11 +16,11 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.queue.QueueCreator;
import org.skywalking.apm.collector.queue.QueueEventHandler;
import org.skywalking.apm.collector.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
......@@ -29,16 +29,20 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
public abstract int queueSize();
@Override final public WorkerRef create() throws ProviderNotFoundException {
private final QueueCreatorService queueCreatorService;
public AbstractLocalAsyncWorkerProvider(QueueCreatorService queueCreatorService) {
this.queueCreatorService = queueCreatorService;
}
@Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
}
workerCreateListener.addWorker(localAsyncWorker);
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
QueueEventHandler queueEventHandler = queueCreator.create(queueSize(), localAsyncWorker);
QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
getClusterContext().put(workerRef);
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* The <code>AbstractRemoteWorker</code> implementations represent workers,
......
......@@ -16,7 +16,9 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
/**
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
......@@ -28,6 +30,12 @@ package org.skywalking.apm.collector.stream;
*/
public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorker> extends AbstractWorkerProvider<T> {
private final RemoteClientService remoteClientService;
public AbstractRemoteWorkerProvider(RemoteClientService remoteClientService) {
this.remoteClientService = remoteClientService;
}
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
......@@ -35,15 +43,16 @@ public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorke
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create() {
T clusterWorker = workerInstance(getClusterContext());
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), clusterWorker);
@Override final public WorkerRef create(WorkerCreateListener workerCreateListener) {
T remoteWorker = workerInstance(getClusterContext());
workerCreateListener.addWorker(remoteWorker);
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), remoteWorker);
getClusterContext().put(workerRef);
return workerRef;
}
public final RemoteWorkerRef create(GRPCClient client) {
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), client);
public final RemoteWorkerRef create(String host, int port) {
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), remoteClientService.create(host, port));
getClusterContext().put(workerRef);
return workerRef;
}
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.List;
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.List;
......
......@@ -16,9 +16,9 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.queue.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
/**
* @author peng-yongsheng
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.define.DefinitionFile;
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
......
......@@ -16,12 +16,12 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
*/
public interface Provider {
WorkerRef create() throws ProviderNotFoundException;
WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException;
}
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
public class ProviderNotFoundException extends Exception {
public ProviderNotFoundException(String message) {
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.List;
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.define.DefinitionFile;
......
......@@ -16,14 +16,10 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,38 +31,28 @@ public class RemoteWorkerRef extends WorkerRef {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final Boolean acrossJVM;
private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub;
private StreamObserver<RemoteMessage> streamObserver;
private final AbstractRemoteWorker remoteWorker;
private final String address;
private final RemoteClient remoteClient;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.stub = null;
this.address = Const.EMPTY_STRING;
this.remoteClient = null;
}
public RemoteWorkerRef(Role role, GRPCClient client) {
public RemoteWorkerRef(Role role, RemoteClient remoteClient) {
super(role);
this.remoteWorker = null;
this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
this.address = client.toString();
createStreamObserver();
this.remoteClient = remoteClient;
}
@Override
public void tell(Object message) throws WorkerInvokeException {
if (acrossJVM) {
try {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
streamObserver.onNext(builder.build());
remoteClient.send(getRole().roleName(), (Data)message, getRole().dataDefine().remoteDataMappingId());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......@@ -80,9 +66,6 @@ public class RemoteWorkerRef extends WorkerRef {
}
@Override public String toString() {
StringBuilder toString = new StringBuilder();
toString.append("acrossJVM: ").append(acrossJVM);
toString.append(", address: ").append(address);
return toString.toString();
return "acrossJVM: " + isAcrossJVM();
}
}
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public enum ClusterWorkerRefCounter {
public enum RemoteWorkerRefCounter {
INSTANCE;
private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
......
......@@ -16,10 +16,10 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.stream.selector.WorkerSelector;
import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector;
/**
* @author peng-yongsheng
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message) {
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.HashMap;
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
*/
public class WorkerCreateListener {
public void addWorker(AbstractWorker worker) {
}
}
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* Defines a general exception a worker can throw when it
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* This exception is raised when worker fails to process job during "call" or "ask"
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
public class WorkerNotFoundException extends WorkerException {
public WorkerNotFoundException(String message) {
......
......@@ -16,7 +16,7 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
/**
* @author peng-yongsheng
......
......@@ -16,10 +16,10 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.base;
import java.util.List;
import org.skywalking.apm.collector.stream.selector.WorkerSelector;
import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -16,10 +16,10 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -16,12 +16,12 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.core.data.AbstractHashMessage;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
......
......@@ -16,11 +16,11 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
......
......@@ -16,11 +16,11 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.base.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
import org.skywalking.apm.collector.stream.worker.base.AbstractWorker;
/**
* The <code>WorkerSelector</code> should be implemented by any class whose instances
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.Role;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.base.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.base.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
private DataCache dataCache;
private int messageNum;
public AggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof EndOfBatchCommand) {
sendToNext();
} else {
messageNum++;
aggregate(message);
if (messageNum >= 100) {
sendToNext();
messageNum = 0;
}
}
}
protected abstract WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException;
private void sendToNext() throws WorkerException {
dataCache.switchPointer();
while (dataCache.getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new WorkerException(e.getMessage(), e);
}
}
dataCache.getLast().asMap().forEach((id, data) -> {
try {
logger.debug(data.toString());
nextWorkRef(id).tell(data);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
});
dataCache.finishReadingLast();
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.writing();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(dataCache.get(data.id()), data);
} else {
dataCache.put(data.id(), data);
}
dataCache.finishWriting();
}
}
......@@ -16,11 +16,10 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker.impl;
/**
* @author peng-yongsheng
*/
public interface WorkerCreateListener<W> {
void onCreate(W workerx);
public class FlushAndSwitch {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.Role;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private DataCache dataCache;
public PersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof FlushAndSwitch) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} finally {
dataCache.trySwitchPointerFinally();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() >= 5000) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(collection);
}
} finally {
dataCache.trySwitchPointerFinally();
}
}
aggregate(message);
}
}
public final List<?> buildBatchCollection() throws WorkerException {
List<?> batchCollection = new LinkedList<>();
try {
while (dataCache.getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
if (dataCache.getLast().asMap() != null) {
batchCollection = prepareBatch(dataCache.getLast().asMap());
}
} finally {
dataCache.finishReadingLast();
}
return batchCollection;
}
protected final List<Object> prepareBatch(Map<String, Data> dataMap) {
List<Object> insertBatchCollection = new LinkedList<>();
List<Object> updateBatchCollection = new LinkedList<>();
dataMap.forEach((id, data) -> {
if (needMergeDBData()) {
Data dbData = persistenceDAO().get(id, getRole().dataDefine());
if (ObjectUtils.isNotEmpty(dbData)) {
getRole().dataDefine().mergeData(data, dbData);
try {
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} else {
try {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
} else {
try {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
});
insertBatchCollection.addAll(updateBatchCollection);
return insertBatchCollection;
}
private void aggregate(Object message) {
dataCache.writing();
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(dataCache.get(data.id()), data);
} else {
dataCache.put(data.id(), data);
}
dataCache.finishWriting();
}
protected abstract IPersistenceDAO persistenceDAO();
protected abstract boolean needMergeDBData();
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
*/
public enum PersistenceWorkerContainer {
INSTANCE;
private List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
public void addWorker(PersistenceWorker worker) {
persistenceWorkers.add(worker);
}
public List<PersistenceWorker> getPersistenceWorkers() {
return persistenceWorkers;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class DataCache extends Window {
private DataCollection lockedDataCollection;
public boolean containsKey(String id) {
return lockedDataCollection.containsKey(id);
}
public Data get(String id) {
return lockedDataCollection.get(id);
}
public void put(String id, Data data) {
lockedDataCollection.put(id, data);
}
public void writing() {
lockedDataCollection = getCurrentAndWriting();
}
public int currentCollectionSize() {
return getCurrent().size();
}
public void finishWriting() {
lockedDataCollection.finishWriting();
lockedDataCollection = null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class DataCollection {
private Map<String, Data> data;
private volatile boolean writing;
private volatile boolean reading;
public DataCollection() {
this.data = new ConcurrentHashMap<>();
this.writing = false;
this.reading = false;
}
public void finishWriting() {
writing = false;
}
public void writing() {
writing = true;
}
public boolean isWriting() {
return writing;
}
public void finishReading() {
reading = false;
}
public void reading() {
reading = true;
}
public boolean isReading() {
return reading;
}
public boolean containsKey(String key) {
return data.containsKey(key);
}
public void put(String key, Data value) {
data.put(key, value);
}
public Data get(String key) {
return data.get(key);
}
public int size() {
return data.size();
}
public void clear() {
data.clear();
}
public Map<String, Data> asMap() {
return data;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public abstract class Window {
private AtomicInteger windowSwitch = new AtomicInteger(0);
private DataCollection pointer;
private DataCollection windowDataA;
private DataCollection windowDataB;
public Window() {
windowDataA = new DataCollection();
windowDataB = new DataCollection();
pointer = windowDataA;
}
public boolean trySwitchPointer() {
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
}
public void trySwitchPointerFinally() {
windowSwitch.addAndGet(-1);
}
public void switchPointer() {
if (pointer == windowDataA) {
pointer = windowDataB;
} else {
pointer = windowDataA;
}
getLast().reading();
}
protected DataCollection getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.writing();
return windowDataA;
} else {
windowDataB.writing();
return windowDataB;
}
}
protected DataCollection getCurrent() {
return pointer;
}
public DataCollection getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
return windowDataA;
}
}
public void finishReadingLast() {
getLast().clear();
getLast().finishReading();
}
}
#
# Copyright 2017, OpenSkywalking Organization All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModuleProvider
\ No newline at end of file
......@@ -40,5 +40,20 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-queue-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-remote-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册