提交 9147fec6 编写于 作者: wu-sheng's avatar wu-sheng

Merge branch 'master' of https://github.com/wu-sheng/sky-walking

* 'master' of https://github.com/wu-sheng/sky-walking:
  fix some question
......@@ -29,38 +29,34 @@ public class RouteAckSpanBufferEventHandler extends AbstractRouteSpanEventHandle
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event.getAckSpan());
buffer.add(event.getAckSpan());
if (stop) {
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
} finally {
buffer.clear();
if (stop) {
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
return;
} finally {
buffer.clear();
}
wait2Finish();
return;
}
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
} finally {
event.setAckSpan(null);
}
}
}
......@@ -32,38 +32,34 @@ public class RouteSendRequestSpanEventHandler extends AbstractRouteSpanEventHand
@Override
public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event.getRequestSpan());
buffer.add(event.getRequestSpan());
if (stop) {
try {
for (RequestSpan requestSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(requestSpan);
spanDisruptor.saveSpan(requestSpan);
}
} finally {
buffer.clear();
if (stop) {
try {
for (RequestSpan requestSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(requestSpan);
spanDisruptor.saveSpan(requestSpan);
}
return;
} finally {
buffer.clear();
}
wait2Finish();
return;
}
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("RequestSpan messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("RequestSpan messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
} finally {
event.setRequestSpan(null);
}
}
}
package com.a.eye.skywalking.storage.data.file;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.storage.data.spandata.SpanData;
import com.a.eye.skywalking.storage.data.index.IndexMetaCollection;
......@@ -20,14 +22,23 @@ public class DataFileWriter {
}
IndexMetaCollection collections = new IndexMetaCollection();
int failedCount = 0;
try {
for (SpanData data : spanData) {
collections.add(dataFile.write(data));
try {
collections.add(dataFile.write(data));
}catch (Throwable e){
failedCount++;
}
}
}finally {
dataFile.flush();
}
if (failedCount > 0) {
HealthCollector.getCurrentHeathReading("DataFileWriter").updateData(HeathReading.ERROR ,"Failed to write %s span to data file.", Integer.valueOf(failedCount));
}
return collections;
}
......
......@@ -35,24 +35,20 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
@Override
public void onEvent(AckSpanData event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event);
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
} finally {
event.setAckSpan(null);
}
}
}
......@@ -35,25 +35,22 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
@Override
public void onEvent(RequestSpanData event, long sequence, boolean endOfBatch) throws Exception {
try {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
} finally {
event.setRequestSpan(null);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册