diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index fb6bab0cf29d50f196a18eeff990b1fc62dac9ea..1637c4832b7e2ac9d4be52b7f94dd301ba4eb617 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -328,11 +328,10 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } int tdAllocMemForCol(SDataCol *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); -void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); +int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); -void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); // Get the data pointer from a column-wised data static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { @@ -357,13 +356,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { } typedef struct { - int maxRowSize; - int maxCols; // max number of columns - int maxPoints; // max number of points - - int numOfRows; - int numOfCols; // Total number of cols - int sversion; // TODO: set sversion + int maxCols; // max number of columns + int maxPoints; // max number of points + int numOfRows; + int numOfCols; // Total number of cols + int sversion; // TODO: set sversion SDataCol *cols; } SDataCols; @@ -407,7 +404,7 @@ static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) { } } -SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); +SDataCols *tdNewDataCols(int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 3f0ab7f93ead2bbba62e8c75035530614798bd28..c793d241f6f175268802830c6a96b35c689e69d1 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -19,10 +19,10 @@ #include "wchar.h" #include "tarray.h" +static void dataColSetNEleNull(SDataCol *pCol, int nEle); static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); -//TODO: change caller to use return val int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { @@ -31,7 +31,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); if(ptr == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno)); return -1; } else { @@ -239,20 +239,19 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->len = 0; } // value from timestamp should be TKEY here instead of TSKEY -void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { +int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); if (isAllRowsNull(pCol)) { if (isNull(value, pCol->type)) { // all null value yet, just return - return; + return 0; } + if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1; if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL - dataColSetNEleNull(pCol, numOfRows, maxPoints); - } else { - tdAllocMemForCol(pCol, maxPoints); + dataColSetNEleNull(pCol, numOfRows); } } @@ -268,12 +267,21 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); pCol->len += pCol->bytes; } + return 0; +} + +static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); + } else { + return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); + } } bool isNEleNull(SDataCol *pCol, int nEle) { if(isAllRowsNull(pCol)) return true; for (int i = 0; i < nEle; i++) { - if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; + if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false; } return true; } @@ -290,9 +298,7 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { } } -void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { - tdAllocMemForCol(pCol, maxPoints); - +static void dataColSetNEleNull(SDataCol *pCol, int nEle) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; for (int i = 0; i < nEle; i++) { @@ -318,7 +324,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { } } -SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { +SDataCols *tdNewDataCols(int maxCols, int maxRows) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); if (pCols == NULL) { uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno)); @@ -326,6 +332,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { } pCols->maxPoints = maxRows; + pCols->maxCols = maxCols; + pCols->numOfRows = 0; + pCols->numOfCols = 0; if (maxCols > 0) { pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); @@ -342,13 +351,8 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->cols[i].pData = NULL; pCols->cols[i].dataOff = NULL; } - - pCols->maxCols = maxCols; } - pCols->maxRowSize = maxRowSize; - - return pCols; } @@ -357,8 +361,9 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { int oldMaxCols = pCols->maxCols; if (schemaNCols(pSchema) > oldMaxCols) { pCols->maxCols = schemaNCols(pSchema); - pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); - if (pCols->cols == NULL) return -1; + void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (ptr == NULL) return -1; + pCols->cols = ptr; for(i = oldMaxCols; i < pCols->maxCols; i++) { pCols->cols[i].pData = NULL; pCols->cols[i].dataOff = NULL; @@ -366,10 +371,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { } } - if (schemaTLen(pSchema) > pCols->maxRowSize) { - pCols->maxRowSize = schemaTLen(pSchema); - } - tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); @@ -398,7 +399,7 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) { } SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { - SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); + SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints); if (pRet == NULL) return NULL; pRet->numOfCols = pDataCols->numOfCols; @@ -413,7 +414,10 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { if (keepData) { if (pDataCols->cols[i].len > 0) { - tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints); + if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) { + tdFreeDataCols(pRet); + return NULL; + } pRet->cols[i].len = pDataCols->cols[i].len; memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { @@ -584,9 +588,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) { + if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, target->maxPoints); + } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, + target->maxPoints); } } target->numOfRows++; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 6c98283189b8a3e83ff888bfb9530bb85127c27d..8f5f885d692f723bc1690a708571093bbe0a2717 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -722,7 +722,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { return -1; } - pCommith->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); + pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); if (pCommith->pDataCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); @@ -920,7 +920,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo SDataCol * pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - // if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } @@ -1277,6 +1276,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (key1 < key2) { for (int i = 0; i < pDataCols->numOfCols; i++) { + //TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } @@ -1308,6 +1308,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(!isRowDel); for (int i = 0; i < pDataCols->numOfCols; i++) { + //TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 62f9e41119ef7ed1b2f0151deeae6e6c4d4eb75e..5ccb9e90f2407561709d36a85ac3e992e5d5a8ba 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -296,7 +296,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { return -1; } - pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); + pComph->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); if (pComph->pDataCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCompactH(pComph); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 8bb2d1c44e17d7409c5e290adf3cf0e7d1e79528..e766d97a97a5905db87691426d282a219eef9d68 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -702,11 +702,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { } //row1 has higher priority -static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, STable* pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) { +static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, + STSchema **ppSchema1, STSchema **ppSchema2, + STable* pTable, int32_t* pPoints, SMemRow* pLastRow) { //for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) { - (*pAffectedRows)++; (*pPoints)++; return NULL; } @@ -715,7 +716,6 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1)); if(pMem == NULL) return NULL; memRowCpy(pMem, row1); - (*pAffectedRows)++; (*pPoints)++; *pLastRow = pMem; return pMem; @@ -750,18 +750,16 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep if(pMem == NULL) return NULL; memRowCpy(pMem, tmp); - (*pAffectedRows)++; (*pPoints)++; - *pLastRow = pMem; return pMem; } static void* tsdbInsertDupKeyMergePacked(void** args) { - return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7], args[8]); + return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]); } -static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) { +static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) { if(pSkipList->insertHandleFn == NULL) { tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9); @@ -769,17 +767,16 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa dupHandleSavedFunc->args[3] = NULL; dupHandleSavedFunc->args[4] = NULL; dupHandleSavedFunc->args[5] = pTable; - dupHandleSavedFunc->args[6] = pAffectedRows; - dupHandleSavedFunc->args[7] = pPoints; - dupHandleSavedFunc->args[8] = pLastRow; pSkipList->insertHandleFn = dupHandleSavedFunc; } + pSkipList->insertHandleFn->args[6] = pPoints; + pSkipList->insertHandleFn->args[7] = pLastRow; } static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) { STsdbMeta *pMeta = pRepo->tsdbMeta; - int64_t points = 0; + int32_t points = 0; STable *pTable = NULL; SSubmitBlkIter blkIter = {0}; SMemTable *pMemTable = NULL; @@ -830,9 +827,10 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * SMemRow lastRow = NULL; int64_t osize = SL_SIZE(pTableData->pData); - tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, pAffectedRows, &points, &lastRow); + tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); int64_t dsize = SL_SIZE(pTableData->pData) - osize; + (*pAffectedRows) += points; if(lastRow != NULL) { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 619b32b3d92cc27f93855e3c5ffe42d327bc4695..21150c66e21cf2488981bd9d475f333073e7aa22 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -17,7 +17,6 @@ #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 -static int tsdbCompareSchemaVersion(const void *key1, const void *key2); static char * getTagIndexKey(const void *pData); static STable *tsdbNewTable(); static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pSTable); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c578555df274aa970c1f107463977c61c0a60d16..716f82d1545d5aa3adb257dc3ed8ea3047acb3ca 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -466,7 +466,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); - pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock); + pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock); if (pQueryHandle->pDataCols == NULL) { tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -1446,7 +1446,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { +static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { char* pData = NULL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; @@ -1481,7 +1481,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; } - if (pColInfo->info.colId == src->colId) { + if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) { if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { memmove(pData, (char*)src->pData + bytes * start, bytes * num); } else { // handle the var-string diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 711c32535bb00bae55d7faafadc115a22c76a949..74d41cce194f9921ee0c521de9e329bad5eeb3f9 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -42,14 +42,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { return -1; } - pReadh->pDCols[0] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); + pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); if (pReadh->pDCols[0] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); return -1; } - pReadh->pDCols[1] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); + pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); if (pReadh->pDCols[1] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); @@ -463,7 +463,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (dcol != 0 && ccol >= pBlockData->numOfCols) { // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + dataColReset(pDataCol); dcol++; continue; } @@ -503,7 +503,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ccol++; } else { // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + dataColReset(pDataCol); dcol++; } } @@ -608,7 +608,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * } if (pBlockCol == NULL) { - dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + dataColReset(pDataCol); continue; }