diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h index 346e1a67957597a4fac8e289aab3691a29b864da..5e57847daa1dddb6f95bb8ce30e99b7803d2ec43 100644 --- a/src/client/inc/tscLog.h +++ b/src/client/inc/tscLog.h @@ -30,7 +30,7 @@ extern int32_t cDebugFlag; } #define tscWarn(...) \ if (cDebugFlag & DEBUG_WARN) { \ - taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \ + taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \ } #define tscTrace(...) \ if (cDebugFlag & DEBUG_TRACE) { \ diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7cfa923f9eacd154ba3f2ec20c6433f3b3524cf4..13f5ebc86ee567e773b1e18c4ba41d6d6b5b6a58 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -53,11 +53,7 @@ typedef struct STableComInfo { } STableComInfo; typedef struct STableMeta { - // super table if it is created according to super table, otherwise, tableInfo is used - union { - struct STableMeta *pSTable; - STableComInfo tableInfo; - }; + STableComInfo tableInfo; uint8_t tableType; int16_t sversion; SCMVgroupInfo vgroupInfo; diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 956121086ca5f8fc13c716428156d2754f9fb12b..8990b69c5aee57dfcfad696e06dc982574aac6b4 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -154,8 +154,8 @@ typedef struct SDataCol { static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); -void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); -void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints); +void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); @@ -195,7 +195,7 @@ typedef struct { int maxPoints; // max number of points int bufSize; - int numOfPoints; + int numOfRows; int numOfCols; // Total number of cols int sversion; // TODO: set sversion void * buf; @@ -205,7 +205,7 @@ typedef struct { #define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) -#define dataColsKeyLast(pCols) ((pCols->numOfPoints == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)) +#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index cf1b77d12ce671d3d0f31bc9c4a1c8a535bc6113..81ea801c944011443b4d0f341427dadfd85f8153 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -187,29 +187,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) } -void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) { +void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); switch (pCol->type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: // set offset - pCol->dataOff[numOfPoints] = pCol->len; + pCol->dataOff[numOfRows] = pCol->len; // Copy data memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); // Update the length pCol->len += varDataTLen(value); break; default: - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); pCol->len += pCol->bytes; break; } } -void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { - int pointsLeft = numOfPoints - pointsToPop; +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) { + int pointsLeft = numOfRows - pointsToPop; ASSERT(pointsLeft > 0); @@ -221,7 +221,7 @@ void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len); dataColSetOffset(pCol, pointsLeft); } else { - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); } @@ -322,7 +322,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->numOfCols = pDataCols->numOfCols; pRet->sversion = pDataCols->sversion; - if (keepData) pRet->numOfPoints = pDataCols->numOfPoints; + if (keepData) pRet->numOfRows = pDataCols->numOfRows; for (int i = 0; i < pDataCols->numOfCols; i++) { pRet->cols[i].type = pDataCols->cols[i].type; @@ -352,7 +352,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { } void tdResetDataCols(SDataCols *pCols) { - pCols->numOfPoints = 0; + pCols->numOfRows = 0; for (int i = 0; i < pCols->maxCols; i++) { dataColReset(pCols->cols + i); } @@ -365,14 +365,14 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { SDataCol *pCol = pCols->cols + i; void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset); - dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints); + dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints); } - pCols->numOfPoints++; + pCols->numOfRows++; } // Pop pointsToPop points from the SDataCols void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { - int pointsLeft = pCols->numOfPoints - pointsToPop; + int pointsLeft = pCols->numOfRows - pointsToPop; if (pointsLeft <= 0) { tdResetDataCols(pCols); return; @@ -380,14 +380,14 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { SDataCol *pCol = pCols->cols + iCol; - dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints); + dataColPopPoints(pCol, pointsToPop, pCols->numOfRows); } - pCols->numOfPoints = pointsLeft; + pCols->numOfRows = pointsLeft; } int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { - ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); - ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints); + ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); + ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); ASSERT(target->numOfCols == source->numOfCols); SDataCols *pTarget = NULL; @@ -395,10 +395,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { - dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints, + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, target->maxPoints); } - target->numOfPoints++; + target->numOfRows++; } } else { pTarget = tdDupDataCols(target, true); @@ -406,7 +406,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int iter1 = 0; int iter2 = 0; - tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge); } tdFreeDataCols(pTarget); @@ -421,30 +421,30 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol // TODO: add resolve duplicate key here tdResetDataCols(target); - while (target->numOfPoints < tRows) { - if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break; + while (target->numOfRows < tRows) { + if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break; - TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; - TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; + TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; + TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; if (key1 <= key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints, + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); } - target->numOfPoints++; + target->numOfRows++; (*iter1)++; if (key1 == key2) (*iter2)++; } else { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints, + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, target->maxPoints); } - target->numOfPoints++; + target->numOfRows++; (*iter2)++; } } diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index d45ceb2a4b42cf6e8084f7a24d29dfed62d7031f..654cb65ec309beb38fcf9a445dca46b8b599882f 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -273,6 +273,46 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num #endif } +static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max, + int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) { + const char* data = pData; + ASSERT(numOfRow <= INT16_MAX); + + for (int32_t i = 0; i < numOfRow; ++i) { + if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_BINARY)) { + (*numOfNull) += 1; + } + + data += varDataLen(data); + } + + *sum = 0; + *max = 0; + *min = 0; + *minIndex = 0; + *maxIndex = 0; +} + +static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max, + int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) { + const char* data = pData; + ASSERT(numOfRow <= INT16_MAX); + + for (int32_t i = 0; i < numOfRow; ++i) { + if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_NCHAR)) { + (*numOfNull) += 1; + } + + data += varDataLen(data); + } + + *sum = 0; + *max = 0; + *min = 0; + *minIndex = 0; + *maxIndex = 0; +} + tDataTypeDescriptor tDataTypeDesc[11] = { {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL}, {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_i8}, @@ -282,9 +322,9 @@ tDataTypeDescriptor tDataTypeDesc[11] = { {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64}, {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f}, {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d}, - {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, NULL}, + {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, getStatics_bin}, {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64}, - {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, NULL}, + {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, getStatics_nchr}, }; char tTokenTypeSwitcher[13] = { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0609ed1870531c88c3e6bf151a0cc95d7b220aa7..0bd6f2ab34949f7254185cccd01228a98b68eec9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -200,7 +200,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * return false; } else { // prev has been located if (pQuery->fileId >= 0) { - pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1; + pQuery->pos = pQuery->pBlock[pQuery->slot].numOfRows - 1; getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), @@ -210,11 +210,11 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); pBlock = &pRuntimeEnv->cacheBlock; - pQuery->pos = pBlock->numOfPoints - 1; + pQuery->pos = pBlock->numOfRows - 1; getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), - pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); + pQuery->fileId, pQuery->slot, pBlock->numOfRows - 1, pQuery->pos); } } } @@ -603,9 +603,9 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int return &pWindowResInfo->pResult[slot].status; } -static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, +static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, int16_t order, int64_t *pData) { - int32_t endPos = searchFn((char *)pData, numOfPoints, ekey, order); + int32_t endPos = searchFn((char *)pData, numOfRows, ekey, order); int32_t forwardStep = 0; if (endPos >= 0) { @@ -2329,7 +2329,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int32_t midPos = -1; - int32_t numOfPoints; + int32_t numOfRows; if (num <= 0) { return -1; @@ -2348,8 +2348,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { if (key == keyList[firstPos]) return firstPos; if (key < keyList[firstPos]) return firstPos - 1; - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; @@ -2374,8 +2374,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { return lastPos; } - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 4225602292726bc3f75253d2e2fa93bc6bd774e7..20a704db49975a0fc32f8b9aff1e5db4d8379bbc 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -74,7 +74,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh); typedef struct { TSKEY keyFirst; TSKEY keyLast; - int32_t numOfPoints; + int32_t numOfRows; void * pData; } SMemTable; @@ -173,7 +173,7 @@ typedef struct { typedef struct { TSKEY keyFirst; TSKEY keyLast; - int64_t numOfPoints; + int64_t numOfRows; SList * list; } SCacheMem; @@ -294,7 +294,7 @@ typedef struct { int64_t last : 1; // If the block in data file or last file int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks int32_t algorithm : 8; // Compression algorithm - int32_t numOfPoints : 24; // Number of total points + int32_t numOfRows : 24; // Number of total points int32_t sversion; // Schema version int32_t len; // Data block length or nothing int16_t numOfSubBlocks; // Number of sub-blocks; diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index be339d28166938a4d9d762555e6651680358b641..edc8472b34661fa616dcd352258249902e711a71 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -82,7 +82,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { memset(ptr, 0, bytes); if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key; if (key > pCache->mem->keyLast) pCache->mem->keyLast = key; - pCache->mem->numOfPoints++; + pCache->mem->numOfRows++; return ptr; } @@ -127,7 +127,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { if (pCache->mem == NULL) return -1; pCache->mem->keyFirst = INT64_MAX; pCache->mem->keyLast = 0; - pCache->mem->numOfPoints = 0; + pCache->mem->numOfRows = 0; pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index dcea2737fff235dbba4ea592500107c1b3daeac9..bd5c20bd7a28d0f3a1f857d3561566a3b2b49f93 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -233,10 +233,10 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { // SCompBlock *pBlock = pStartBlock; // for (int i = 0; i < numOfBlocks; i++) { // if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; -// pCols->numOfPoints += (pCompData->cols[0].len / 8); +// pCols->numOfRows += (pCompData->cols[0].len / 8); // for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { // SCompCol *pCompCol = &(pCompData->cols[iCol]); -// // pCols->numOfPoints += pBlock->numOfPoints; +// // pCols->numOfRows += pBlock->numOfRows; // int k = 0; // for (; k < pCols->numOfCols; k++) { // if (pCompCol->colId == pCols->cols[k].colId) break; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 7db61db429089ae6a9b2e0acc7612879d3a28e2c..55342ffd38add8f8975945addaedf466955af969 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -830,7 +830,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize); TSKEY key = dataRowKey(row); - // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); + // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfRows); // Copy row into the memory SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); @@ -854,7 +854,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key > pTable->lastKey) pTable->lastKey = key; - pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); + pTable->mem->numOfRows = tSkipListGetSize(pTable->mem->pData); tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row)); @@ -1063,7 +1063,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters while (true) { int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); assert(rowsRead >= 0); - if (pDataCols->numOfPoints == 0) break; + if (pDataCols->numOfRows == 0) break; nLoop++; ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); @@ -1072,13 +1072,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); ASSERT(rowsWritten != 0); if (rowsWritten < 0) goto _err; - ASSERT(rowsWritten <= pDataCols->numOfPoints); + ASSERT(rowsWritten <= pDataCols->numOfRows); tdPopDataColsPoints(pDataCols, rowsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; } - ASSERT(pDataCols->numOfPoints == 0); + ASSERT(pDataCols->numOfRows == 0); // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 82584fc5df7e846d7a4ad5eed0e3f07c657cb51e..eebe0b6b4614d2ba0b1420772bdedc1b758065d3 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -307,7 +307,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { */ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); - ASSERT(pDataCols->numOfPoints > 0); + ASSERT(pDataCols->numOfRows > 0); SCompBlock compBlock; int rowsToWrite = 0; @@ -322,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block ASSERT(pHelper->hasOldLastBlock == false); - rowsToWrite = pDataCols->numOfPoints; + rowsToWrite = pDataCols->numOfRows; SFile *pWFile = NULL; bool isLast = false; @@ -380,10 +380,10 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { if (pCompBlock->numOfSubBlocks > 1) { if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && - pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); + ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && + pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], - pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) + pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; @@ -625,13 +625,13 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, for (int i = 1; i < numOfSubBlocks; i++) { pStartBlock++; if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; - tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints); + tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows); } return 0; } -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints, +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize) { // Verify by checksum if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; @@ -640,16 +640,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 if (comp) { // // Need to decompress pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))( - content, len - sizeof(TSCKSUM), numOfPoints, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize); + content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize); if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfPoints); + dataColSetOffset(pDataCol, numOfRows); } } else { // No need to decompress, just memcpy it pDataCol->len = len - sizeof(TSCKSUM); memcpy(pDataCol->pData, content, pDataCol->len); if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfPoints); + dataColSetOffset(pDataCol, numOfRows); } } return 0; @@ -673,7 +673,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; - pDataCols->numOfPoints = pCompBlock->numOfPoints; + pDataCols->numOfRows = pCompBlock->numOfRows; // Recover the data int ccol = 0; @@ -682,7 +682,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (ccol >= pCompData->numOfCols) { // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); dcol++; continue; } @@ -691,15 +691,15 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa if (pCompCol->colId == pDataCol->colId) { if (pCompBlock->algorithm == TWO_STAGE_COMP) { - int zsize = pDataCol->bytes * pCompBlock->numOfPoints + COMP_OVERFLOW_BYTES; + int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) { - zsize += (sizeof(VarDataLenT) * pCompBlock->numOfPoints); + zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); } pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize); if (pHelper->compBuffer == NULL) goto _err; } if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, - pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, + pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints, pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) goto _err; dcol++; @@ -708,7 +708,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ccol++; } else { // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); dcol++; } } @@ -732,7 +732,7 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar tdResetDataCols(pHelper->pDataCols[1]); pCompBlock++; if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err; - if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; + if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; } // if (target) TODO @@ -753,7 +753,7 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) { - ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && + ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock); SCompData *pCompData = (SCompData *)(pHelper->pBuffer); @@ -840,7 +840,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompBlock->last = isLast; pCompBlock->offset = offset; pCompBlock->algorithm = pHelper->config.compress; - pCompBlock->numOfPoints = rowsToWrite; + pCompBlock->numOfRows = rowsToWrite; pCompBlock->sversion = pHelper->tableInfo.sversion; pCompBlock->len = (int32_t)lsize; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; @@ -877,7 +877,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa int rowsWritten = 0; SCompBlock compBlock = {0}; - ASSERT(pDataCols->numOfPoints > 0); + ASSERT(pDataCols->numOfRows > 0); TSKEY keyFirst = dataColsKeyFirst(pDataCols); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; @@ -889,32 +889,32 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append - ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1); + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1); int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface - rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); + rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows); if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && - (blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { + (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; - ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints); + ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows); // Merge if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; // Write SFile *pWFile = NULL; bool isLast = false; - if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) { + if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) { pWFile = &(pHelper->files.dataF); } else { isLast = true; pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); } if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], - pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0) + pHelper->pDataCols[0]->numOfRows, &compBlock, isLast, true) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } @@ -931,7 +931,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // rows1: number of rows must merge in this block int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); // rows2: max nuber of rows the block can have more - int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints; + int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows; // rows3: number of rows between this block and the next block int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); @@ -939,7 +939,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if ((rows2 >= rows1) && (( blockAtIdx(pHelper, blkIdx)->last) || - ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { + ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { rowsWritten = rows1; bool isLast = false; SFile *pFile = NULL; @@ -965,11 +965,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa int round = 0; // tdResetDataCols(pHelper->pDataCols[1]); while (true) { - if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break; + if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break; tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); - ASSERT(pHelper->pDataCols[1]->numOfPoints > 0); + ASSERT(pHelper->pDataCols[1]->numOfRows > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], - pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err; if (round == 0) { tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx); @@ -980,17 +980,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa blkIdx++; // TODO: the blkIdx here is not correct - // if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { - // if (pHelper->pDataCols[1]->numOfPoints > 0) { + // if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) { + // if (pHelper->pDataCols[1]->numOfRows > 0) { // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], - // pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + // pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) // goto _err; // // TODO: the blkIdx here is not correct - // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); + // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows); // } // } - // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints + // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows // ? INT64_MAX // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; @@ -998,11 +998,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // if (key1 < key2) { // for (int i = 0; i < pDataCols->numOfCols; i++) { // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), // TYPE_BYTES[pDataCol->type]); // } - // pHelper->pDataCols[1]->numOfPoints++; + // pHelper->pDataCols[1]->numOfRows++; // iter1++; // } else if (key1 == key2) { // // TODO: think about duplicate key cases @@ -1010,17 +1010,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // } else { // for (int i = 0; i < pDataCols->numOfCols; i++) { // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), // ((char *)pDataCols->cols[i].pData + // TYPE_BYTES[pDataCol->type] * iter2), // TYPE_BYTES[pDataCol->type]); // } - // pHelper->pDataCols[1]->numOfPoints++; + // pHelper->pDataCols[1]->numOfRows++; // iter2++; // } - // if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; + // if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err; // // TODO: blkIdx here is not correct, fix it // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); @@ -1133,7 +1133,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId pSCompBlock->numOfSubBlocks++; ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); pSCompBlock->len += sizeof(SCompBlock); - pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->numOfRows += rowsAdded; pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); pIdx->len += sizeof(SCompBlock); @@ -1164,7 +1164,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ((SCompBlock *)ptr)[1] = *pCompBlock; pSCompBlock->numOfSubBlocks = 2; - pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->numOfRows += rowsAdded; pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); pSCompBlock->len = sizeof(SCompBlock) * 2; pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst); @@ -1219,7 +1219,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int // Get the number of rows in range [minKey, maxKey] static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) { - if (pDataCols->numOfPoints == 0) return 0; + if (pDataCols->numOfRows == 0) return 0; ASSERT(minKey <= maxKey); TSKEY keyFirst = dataColsKeyFirst(pDataCols); @@ -1228,11 +1228,11 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) if (minKey > keyLast || maxKey < keyFirst) return 0; - void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY), compTSKEY, TD_GE); ASSERT(ptr1 != NULL); - void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY), compTSKEY, TD_LE); ASSERT(ptr2 != NULL); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7a703122594f76136825ec250f1cf9d7ff8b8eaf..0e70ab2d7e338965cee1f2e968852cfe18115a49 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -436,7 +436,7 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo SDataBlockInfo info = { .window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast}, .numOfCols = pBlock->numOfCols, - .rows = pBlock->numOfPoints, + .rows = pBlock->numOfRows, .tid = pCheckInfo->tableId.tid, .uid = pCheckInfo->tableId.uid, }; @@ -608,11 +608,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; - assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfPoints == pBlock->numOfPoints); + assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); if (pCheckInfo->lastKey > pBlock->keyFirst) { cur->pos = - binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); + binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { cur->pos = 0; } @@ -630,9 +630,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock SDataCols* pDataCols = pCheckInfo->pDataCols; if (pCheckInfo->lastKey < pBlock->keyLast) { cur->pos = - binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); + binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { - cur->pos = pBlock->numOfPoints - 1; + cur->pos = pBlock->numOfRows - 1; } doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); @@ -647,7 +647,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; - int numOfPoints; + int numOfRows; TSKEY* keyList; assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); @@ -665,8 +665,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { if (key == keyList[firstPos]) return firstPos; if (key < keyList[firstPos]) return firstPos - 1; - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; @@ -691,8 +691,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return lastPos; } - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; @@ -810,7 +810,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->mixBlock = (cur->pos != blockInfo.rows - 1); } else { int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); + endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); cur->mixBlock = true; } @@ -904,7 +904,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* } int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); + int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it tSkipListIterNext(pCheckInfo->iter); } @@ -1002,7 +1002,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; - int numOfPoints; + int numOfRows; TSKEY* keyList; if (num <= 0) return -1; @@ -1018,8 +1018,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { if (key == keyList[firstPos]) return firstPos; if (key < keyList[firstPos]) return firstPos - 1; - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; @@ -1044,8 +1044,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { return lastPos; } - numOfPoints = lastPos - firstPos + 1; - midPos = (numOfPoints >> 1) + firstPos; + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; if (key < keyList[midPos]) { lastPos = midPos - 1; @@ -1066,7 +1066,6 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n for (int32_t i = 0; i < numOfTables; ++i) { STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i]; -// tfree(pBlockInfo->statis); tfree(pBlockInfo); } @@ -1539,9 +1538,19 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); - tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, QH_GET_NUM_OF_COLS(pHandle)); + size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); + memset(pHandle->statis, 0, sizeof(SDataStatis) * numOfCols); + tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); + *pBlockStatis = pHandle->statis; + //update the number of NULL data rows + for(int32_t i = 0; i < numOfCols; ++i) { + if (pHandle->statis[i].numOfNull == -1) { + pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; + } + } + return TSDB_CODE_SUCCESS; } @@ -1575,7 +1584,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); // todo refactor - int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfPoints - 1); + int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { diff --git a/tests/pytest/import_merge/importToCommit.py b/tests/pytest/import_merge/importToCommit.py index dd2c27918ab7f52d799715b2c14ac02c05fc7aca..9a17ae95faf039c4ceee29f949f6d0e47a2b2d28 100644 --- a/tests/pytest/import_merge/importToCommit.py +++ b/tests/pytest/import_merge/importToCommit.py @@ -46,7 +46,6 @@ class TDTestCase: tdLog.info('insert data until the first commit') dnodesDir = tdDnodes.getDnodesRootDir() dataDir = dnodesDir + '/dnode1/data/vnode' - tdLog.info('CBD: dataDir=%s' % dataDir) startTime = self.startTime rid0 = 1 while (True):