diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java index 1fd7971bde4d913578df76166a132114e195d325..feac53fac9474eee143a0b75922971638581ed33 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/file/DataFileLoader.java @@ -21,7 +21,8 @@ public class DataFileLoader { List allDataFile = new ArrayList(); for (File fileEntry : dataFileDir.listFiles()) { - allDataFile.add(new DataFile(fileEntry)); + if (fileEntry.getName().split("_").length == 8) + allDataFile.add(new DataFile(fileEntry)); } return allDataFile; } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java index 15b9976936a2c7f7b62743a5e006ab677d3be890..eceede1ead84f183efee50e259b228309c504898 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/listener/StorageListener.java @@ -31,10 +31,12 @@ public class StorageListener implements SpanStorageListener { public StorageListener() { requestSpanDisruptor = new Disruptor(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); requestSpanDisruptor.handleEventsWith(new StoreRequestSpanEventHandler()); + requestSpanDisruptor.start(); requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer(); ackSpanDisruptor = new Disruptor(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); ackSpanDisruptor.handleEventsWith(new StoreAckSpanEventHandler()); + ackSpanDisruptor.start(); ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer(); } @@ -51,7 +53,7 @@ public class StorageListener implements SpanStorageListener { logger.error("RequestSpan trace-id[{}] store failure..", requestSpan.getTraceId(), e); HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "RequestSpan store failure."); return false; - } finally{ + } finally { requestSpanRingBuffer.publish(sequence); } } @@ -69,8 +71,8 @@ public class StorageListener implements SpanStorageListener { logger.error("AckSpan trace-id[{}] store failure..", ackSpan.getTraceId(), e); HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "AckSpan store failure."); return false; - } finally{ - requestSpanRingBuffer.publish(sequence); + } finally { + ackSpanRingBuffer.publish(sequence); } } }