From c29ff049f2365f3988fabb5418ea0a05321ecf6b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Jun 2020 17:19:30 +0800 Subject: [PATCH] [td-90] support multi-version SDataRow in cache --- src/common/src/tdataformat.c | 3 +- src/tsdb/src/tsdbRead.c | 241 +++++++++++++---------------------- 2 files changed, 87 insertions(+), 157 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 7880a4b302..15877ff541 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -269,8 +269,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); - varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; - setNull(varDataVal(ptr), pCol->type, pCol->bytes); + setVardataNull(ptr, pCol->type); pCol->len += varDataTLen(ptr); } else { setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 86ce44ad7b..7dc87363e4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; -// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded - changeQueryHandleForInterpQuery(pQueryHandle); return pQueryHandle; } @@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); + /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - - TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; - if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - - SDataRow row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - - if (k1 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iter)) { - node = tSkipListIterGet(pCheckInfo->iter); - row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - } else { - k1 = TSKEY_INITIAL_VAL; - } - } - } - - if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - - SDataRow row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - - if (k2 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iiter)) { - node = tSkipListIterGet(pCheckInfo->iiter); - row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - } else { - k2 = TSKEY_INITIAL_VAL; - } - } - } - + SDataRow row = getSDataRowInTableMem(pCheckInfo); + + TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); - if ((ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 <= binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 <= binfo.window.ekey))) || - (!ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 >= binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 >= binfo.window.skey)))) { + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - if ((ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) || - (!ASCENDING_TRAVERSE(pQueryHandle->order) && - (((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.skey))))) { + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { + // do not load file block into buffer int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; @@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock return pQueryHandle->realNumOfRows > 0; } -static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { +static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; int numOfRows; TSKEY* keyList; @@ -868,37 +831,63 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap return numOfRows + num; } -static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, - int32_t numOfRows, SDataRow row, STSchema* pSchema) { - int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); - int32_t numOfTableCols = schemaNCols(pSchema); - +static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, + STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { char* pData = NULL; - for (int32_t i = 0; i < numOfCols; ++i) { + + // the schema version info is embeded in SDataRow + STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); + int32_t numOfRowCols = schemaNCols(pSchema); + + int32_t i = 0, j = 0; + while(i < numOfCols && j < numOfRowCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - + if (pSchema->columns[j].colId < pColInfo->info.colId) { + j++; + continue; + } + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; } - - int32_t offset = 0; - for (int32_t j = 0; j < numOfTableCols; ++j) { - if (pColInfo->info.colId == pSchema->columns[j].colId) { - offset = pSchema->columns[j].offset; - break; + + if (pSchema->columns[j].colId == pColInfo->info.colId) { + void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pData, value, varDataTLen(value)); + } else { + memcpy(pData, value, pColInfo->info.bytes); + } + + j++; + i++; + } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } + i++; } - - assert(offset != -1); // todo handle error - void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); - + } + + while (i < numOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pData, value, varDataTLen(value)); + setVardataNull(pData, pColInfo->info.type); } else { - memcpy(pData, value, pColInfo->info.bytes); + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } + + i++; } } @@ -911,7 +900,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* initTableMemIterator(pQueryHandle, pCheckInfo); SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; - + + // for search the endPos, so the order needs to reverse + int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); + + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + STable* pTable = pCheckInfo->pTableObj; + int32_t endPos = cur->pos; if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.rows - 1; @@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* endPos = 0; cur->mixBlock = (cur->pos != blockInfo.rows - 1); } else { - int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); + assert(pCols->numOfRows > 0); + endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); cur->mixBlock = true; } @@ -933,8 +931,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t numOfRows = 0; pQueryHandle->cur.win = TSWINDOW_INITIALIZER; - int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; - + // no data in buffer, load data from file directly if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { int32_t start = cur->pos; @@ -950,12 +947,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // todo opt in case of no data in buffer numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); - // if the buffer is not full in case of descending order query, move the data in the front of the buffer + // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) { int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; - int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); - - for(int32_t i = 0; i < reqNumOfCols; ++i) { + + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } @@ -969,20 +965,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pQueryHandle->realNumOfRows = numOfRows; cur->rows = numOfRows; return; - } else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { - // } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) { - // } else { // iter and iiter are all not NULL, three-way merge data block - STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); + } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; - do { - node = tSkipListIterGet(pCheckInfo->iter); - if (node == NULL) { + SDataRow row = getSDataRowInTableMem(pCheckInfo); + if (row == NULL) { break; } - SDataRow row = SL_GET_NODE_DATA(node); - TSKEY key = dataRowKey(row); + TSKEY key = dataRowKey(row); if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { break; @@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->lastKey = key + step; cur->mixBlock = true; - tSkipListIterNext(pCheckInfo->iter); + moveToNextRow(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it - tSkipListIterNext(pCheckInfo->iter); + moveToNextRow(pCheckInfo); } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; } - int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); + int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it tSkipListIterNext(pCheckInfo->iter); } @@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (numOfRows < pQueryHandle->outputCapacity) { int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; - - int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); - for(int32_t i = 0; i < requiredNumOfCols; ++i) { + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } @@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) { - printf("abc\n"); - } if (pCheckInfo->pTableObj->lastKey > key) { key = pCheckInfo->pTableObj->lastKey; index = i; @@ -1652,9 +1637,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int *skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); - STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); - int32_t numOfTableCols = schemaNCols(pSchema); - + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + STable* pTable = pCheckInfo->pTableObj; + do { SDataRow row = getSDataRowInTableMem(pCheckInfo); if (row == NULL) { @@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } TSKEY key = dataRowKey(row); - - if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || - (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - + if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, pQueryHandle->window.ekey); @@ -1677,59 +1659,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } *ekey = key; - char* pData = NULL; - - int32_t i = 0, j = 0; - while(i < numOfCols && j < numOfTableCols) { - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (pSchema->columns[j].colId < pColInfo->info.colId) { - j++; - continue; - } - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pSchema->columns[j].colId == pColInfo->info.colId) { - void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pData, value, varDataTLen(value)); - } else { - memcpy(pData, value, pColInfo->info.bytes); - } - - j++; - i++; - } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - i++; - } - } - - while (i < numOfCols) { // the remain columns are all null data - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - - i++; - } - + copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); + if (++numOfRows >= maxRowsToRead) { moveToNextRow(pCheckInfo); break; -- GitLab