diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a8376fc824ecc0b76e689b2088fabc6823a87b8f..cd421032cd963f1738ba7bcc480033ce50a3330c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1176,13 +1176,15 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity } static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, - int32_t numOfCols, STable* pTable) { + int32_t numOfCols, STable* pTable, STSchema* pSchema) { char* pData = NULL; // the schema version info is embeded in SDataRow - STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); int32_t numOfRowCols = schemaNCols(pSchema); - + if (pSchema == NULL) { + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + } + int32_t i = 0, j = 0; while(i < numOfCols && j < numOfRowCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); @@ -1199,10 +1201,38 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, if (pSchema->columns[j].colId == pColInfo->info.colId) { void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pData, value, varDataTLen(value)); - } else { - memcpy(pData, value, pColInfo->info.bytes); + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t *)pData = *(uint8_t *)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t *)pData = *(uint16_t *)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t *)pData = *(uint32_t *)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + *(uint64_t *)pData = *(uint64_t *)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + default: + memcpy(pData, value, pColInfo->info.bytes); } j++; @@ -1401,6 +1431,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; + int16_t rv = -1; + STSchema* pSchema = NULL; + int32_t pos = cur->pos; cur->win = TSWINDOW_INITIALIZER; @@ -1429,7 +1462,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); + if (rv != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + rv = dataRowVersion(row); + } + + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1442,7 +1480,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it if (pCfg->update) { - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); + if (rv != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + rv = dataRowVersion(row); + } + + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1987,6 +2030,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int int64_t st = taosGetTimestampUs(); STable* pTable = pCheckInfo->pTableObj; + int16_t rv = -1; + STSchema* pSchema = NULL; do { SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); @@ -2007,7 +2052,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } win->ekey = key; - copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable); + if (rv != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + rv = dataRowVersion(row); + } + copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema); if (++numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); @@ -2090,7 +2139,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { return false; } - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); tfree(pRow); // update the last key value @@ -2164,7 +2213,7 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) { return false; } - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); tfree(pRow); // update the last key value