未验证 提交 28a6b0c5 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #5109 from taosdata/hotfix/TD-1976

[TD-1976]optimize read rows from cache
......@@ -1176,13 +1176,15 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
}
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
int32_t numOfCols, STable* pTable) {
int32_t numOfCols, STable* pTable, STSchema* pSchema) {
char* pData = NULL;
// the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema);
if (pSchema == NULL) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
}
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
......@@ -1199,10 +1201,38 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
*(uint64_t *)pData = *(uint64_t *)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
......@@ -1401,6 +1431,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0;
int16_t rv = -1;
STSchema* pSchema = NULL;
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
......@@ -1429,7 +1462,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1442,7 +1480,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
if (pCfg->update) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1987,6 +2030,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
int64_t st = taosGetTimestampUs();
STable* pTable = pCheckInfo->pTableObj;
int16_t rv = -1;
STSchema* pSchema = NULL;
do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
......@@ -2007,7 +2052,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
win->ekey = key;
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
}
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema);
if (++numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo);
......@@ -2090,7 +2139,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
// update the last key value
......@@ -2164,7 +2213,7 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
// update the last key value
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册