提交 a0c34a4d 编写于 作者: S Steve Yurong Su

add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan

上级 76ecb64e
......@@ -1050,6 +1050,7 @@ public class StorageGroupProcessor {
return;
}
insertRowPlan.checkForTianYuan("StorageGroupProcessor#insertToTsFileProcessor");
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
......@@ -2910,6 +2911,7 @@ public class StorageGroupProcessor {
writeLock("InsertRowsOfOneDevice");
try {
boolean isSequence = false;
insertRowsOfOneDevicePlan.checkForTianYuan("StorageGroupProcessor#insert");
InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) {
......
......@@ -188,6 +188,8 @@ public class TsFileProcessor {
checkMemCostAndAddToTspInfo(insertRowPlan);
}
insertRowPlan.checkForTianYuan("TsFileProcessor#InsertRowPlan");
workMemTable.insert(insertRowPlan);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
......
......@@ -1198,6 +1198,7 @@ public class PlanExecutor implements IPlanExecutor {
// we do not need to infer data type for insertRowsOfOneDevicePlan
}
// ok, we can begin to write data into the engine..
insertRowsOfOneDevicePlan.checkForTianYuan("PlanExecutor#insert");
StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
List<String> notExistedPaths = null;
......
......@@ -63,6 +63,21 @@ public class InsertRowPlan extends InsertPlan {
private List<Object> failedValues;
public void checkForTianYuan(String location) {
for (int j = 0; j < getMeasurements().length; j++) {
if (getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
String value = ((Binary) getValues()[j]).getStringValue().substring(0, 35);
if (!value.contains(getDeviceId().getMeasurement().substring(4))) {
logger.error(
"{}: receive error data,device:{}, value(first 100 bytes): {}",
location,
getDeviceId(),
value);
}
}
}
}
public InsertRowPlan() {
super(OperatorType.INSERT);
}
......
......@@ -23,6 +23,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
......@@ -31,15 +35,29 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
private static Logger logger = LoggerFactory.getLogger(InsertRowsOfOneDevicePlan.class);
boolean[] isExecuted;
private InsertRowPlan[] rowPlans;
public void checkForTianYuan(String location) {
for (int i = 0; i < rowPlans.length; ++i) {
for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 35);
if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
logger.error(
"{}: receive error data,device:{}, value(first 100 bytes): {}",
location,
rowPlans[i].getDeviceId(),
value);
}
}
}
}
}
public InsertRowsOfOneDevicePlan(
PartialPath deviceId,
Long[] insertTimes,
......@@ -67,12 +85,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
+ ", time:"
+ insertTimes[i]);
}
//Just for Tianyuan debug
for(int j = 0; j < rowPlans[i].getMeasurements().length; j ++) {
// Just for Tianyuan debug
for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0,100);
String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 100);
if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
logger.error("receive error data,device:{}, value(first 100 bytes): {}", rowPlans[i].getDeviceId(), value);
logger.error(
"receive error data,device:{}, value(first 100 bytes): {}",
rowPlans[i].getDeviceId(),
value);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册