diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 8beca06615ca77127bf8d13743661eb78981cba0..cecb5640fd299e7096c31e0123edef897d375c00 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -1050,6 +1050,7 @@ public class StorageGroupProcessor { return; } + insertRowPlan.checkForTianYuan("StorageGroupProcessor#insertToTsFileProcessor"); tsFileProcessor.insert(insertRowPlan); // try to update the latest time of the device of this tsRecord @@ -2910,6 +2911,7 @@ public class StorageGroupProcessor { writeLock("InsertRowsOfOneDevice"); try { boolean isSequence = false; + insertRowsOfOneDevicePlan.checkForTianYuan("StorageGroupProcessor#insert"); InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans(); for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 964227175b30daee96fe6018d75bec253b8bfd0c..fd2aaa8a248ff02065941dcb432ebc486a9f05e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -188,6 +188,8 @@ public class TsFileProcessor { checkMemCostAndAddToTspInfo(insertRowPlan); } + insertRowPlan.checkForTianYuan("TsFileProcessor#InsertRowPlan"); + workMemTable.insert(insertRowPlan); if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index b07750fe9919264ea74ef887a0e3f5c59a3ae7b1..ce2ddf1122cc813c89bc9ea1527e6b7656563792 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1198,6 +1198,7 @@ public class PlanExecutor implements IPlanExecutor { // we do not need to infer data type for insertRowsOfOneDevicePlan } // ok, we can begin to write data into the engine.. + insertRowsOfOneDevicePlan.checkForTianYuan("PlanExecutor#insert"); StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan); List notExistedPaths = null; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java index 8399527e0110e0df79b78b3cc9f7ce053a7a469b..f7caf0b0c1b47494386733f209c9cdfb5840b5f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java @@ -63,6 +63,21 @@ public class InsertRowPlan extends InsertPlan { private List failedValues; + public void checkForTianYuan(String location) { + for (int j = 0; j < getMeasurements().length; j++) { + if (getMeasurements()[j].equals("TY_0001_Raw_Packet")) { + String value = ((Binary) getValues()[j]).getStringValue().substring(0, 35); + if (!value.contains(getDeviceId().getMeasurement().substring(4))) { + logger.error( + "{}: receive error data,device:{}, value(first 100 bytes): {}", + location, + getDeviceId(), + value); + } + } + } + } + public InsertRowPlan() { super(OperatorType.INSERT); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java index 46da4e011e518d01e7648e40664240437aa4ae4d..a861adf56e32a305c73c11ea51cdc952d4cb06d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java @@ -23,6 +23,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.BatchPlan; +import org.apache.iotdb.tsfile.utils.Binary; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; @@ -31,15 +35,29 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.iotdb.tsfile.utils.Binary; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan { private static Logger logger = LoggerFactory.getLogger(InsertRowsOfOneDevicePlan.class); boolean[] isExecuted; private InsertRowPlan[] rowPlans; + public void checkForTianYuan(String location) { + for (int i = 0; i < rowPlans.length; ++i) { + for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) { + if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) { + String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 35); + if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) { + logger.error( + "{}: receive error data,device:{}, value(first 100 bytes): {}", + location, + rowPlans[i].getDeviceId(), + value); + } + } + } + } + } + public InsertRowsOfOneDevicePlan( PartialPath deviceId, Long[] insertTimes, @@ -67,12 +85,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan { + ", time:" + insertTimes[i]); } - //Just for Tianyuan debug - for(int j = 0; j < rowPlans[i].getMeasurements().length; j ++) { + // Just for Tianyuan debug + for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) { if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) { - String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0,100); + String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 100); if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) { - logger.error("receive error data,device:{}, value(first 100 bytes): {}", rowPlans[i].getDeviceId(), value); + logger.error( + "receive error data,device:{}, value(first 100 bytes): {}", + rowPlans[i].getDeviceId(), + value); } } }