未验证 提交 0103d8ad 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Fix no stream register. (#2873)

* Fix no stream register.

* Make sure list order.

* Refactor codes.

* Fix wrong revert.
上级 0f9c27d4
......@@ -160,6 +160,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan(() -> {
});
streamDataMapping.init();
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
......@@ -66,6 +67,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(metricsClass);
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsPersistentWorker monthPersistentWorker = null;
......
......@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*;
......@@ -57,6 +58,9 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(inventoryClass);
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
......
......@@ -25,12 +25,12 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
* @author peng-yongsheng
*/
public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter {
private int id = 0;
private List<Class<? extends StreamData>> streamClassList;
private final Map<Class<? extends StreamData>, Integer> classMap;
private final Map<Integer, Class<? extends StreamData>> idMap;
public StreamDataMapping() {
streamClassList = new ArrayList<>();
this.classMap = new HashMap<>();
this.idMap = new HashMap<>();
}
......@@ -40,9 +40,26 @@ public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMap
return;
}
id++;
classMap.put(streamDataClass, id);
idMap.put(id, streamDataClass);
streamClassList.add(streamDataClass);
}
public void init() {
/**
* The stream protocol use this list order to assign the ID,
* which is used in across node communication. This order must be certain.
*/
Collections.sort(streamClassList, new Comparator<Class>() {
@Override public int compare(Class streamClass1, Class streamClass2) {
return streamClass1.getName().compareTo(streamClass2.getName());
}
});
for (int i = 0; i < streamClassList.size(); i++) {
Class<? extends StreamData> streamClass = streamClassList.get(i);
int streamId = i + 1;
classMap.put(streamClass, streamId);
idMap.put(streamId, streamClass);
}
}
@Override public int findIdByClass(Class<? extends StreamData> streamDataClass) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册