提交 6f30ae25 编写于 作者: D dapan1121

fix bug

上级 71357a21
...@@ -1176,13 +1176,15 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity ...@@ -1176,13 +1176,15 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
int32_t numOfCols, STable* pTable) { int32_t numOfCols, STable* pTable, STSchema* pSchema) {
char* pData = NULL; char* pData = NULL;
// the schema version info is embeded in SDataRow // the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema); int32_t numOfRowCols = schemaNCols(pSchema);
if (pSchema == NULL) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
}
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfRowCols) { while(i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
...@@ -1199,10 +1201,38 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1199,10 +1201,38 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if (pSchema->columns[j].colId == pColInfo->info.colId) { if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { switch (pColInfo->info.type) {
memcpy(pData, value, varDataTLen(value)); case TSDB_DATA_TYPE_BINARY:
} else { case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, pColInfo->info.bytes); memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
*(uint64_t *)pData = *(uint64_t *)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
} }
j++; j++;
...@@ -1401,6 +1431,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1401,6 +1431,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0; int32_t numOfRows = 0;
int16_t rv = -1;
STSchema* pSchema = NULL;
int32_t pos = cur->pos; int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER; cur->win = TSWINDOW_INITIALIZER;
...@@ -1429,7 +1462,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1429,7 +1462,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -1442,7 +1480,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1442,7 +1480,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
if (pCfg->update) { if (pCfg->update) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -1987,6 +2030,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1987,6 +2030,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
int16_t rv = -1;
STSchema* pSchema = NULL;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
...@@ -2007,7 +2052,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -2007,7 +2052,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
win->ekey = key; win->ekey = key;
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable); if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema);
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
...@@ -2090,7 +2139,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -2090,7 +2139,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false; return false;
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow); tfree(pRow);
// update the last key value // update the last key value
...@@ -2164,7 +2213,7 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) { ...@@ -2164,7 +2213,7 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
return false; return false;
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow); tfree(pRow);
// update the last key value // update the last key value
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册