diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 8124075e6cb8ada229a5cc53c7301ed4b9ae8bb0..f6683e48434e7b5d88028713e9f5957c2c4a6e0f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -193,7 +193,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i if (offset == 0) { ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); TKEY tvalue = tdGetTKEY(*(TSKEY *)value); - memcpy(POINTER_SHIFT(row, toffset), &tvalue, TYPE_BYTES[type]); + memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]); } else { memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); } @@ -227,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); -void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); @@ -235,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); // Get the data pointer from a column-wised data static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { - switch (pCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); - break; - - default: - return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); - break; + if (IS_VAR_DATA_TYPE(pCol->type)) { + return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); + } else { + return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); } } static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { ASSERT(rows > 0); - switch (pDataCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1)); - break; - default: - return TYPE_BYTES[pDataCol->type] * rows; + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1)); + } else { + return TYPE_BYTES[pDataCol->type] * rows; } } @@ -277,11 +268,11 @@ typedef struct { #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) #define dataColsTKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0) -#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, 0) +#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, 0) #define dataColsTKeyLast(pCols) \ (((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1)) #define dataColsKeyLast(pCols) \ - (((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) + (((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); @@ -289,7 +280,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); -void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); // ----------------- K-V data row structure diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 4596adaf144dfcb0235d5acc6d144e644bc943f7..49d21d927525a83fb1f93b02fdf44817d6be1743 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -205,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); pDataCol->spaceSize = pDataCol->bytes * maxPoints; @@ -218,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) } } +// value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); - switch (pCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - // set offset - pCol->dataOff[numOfRows] = pCol->len; - // Copy data - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); - // Update the length - pCol->len += varDataTLen(value); - break; - default: - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); - pCol->len += pCol->bytes; - break; - } -} - -void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) { - int pointsLeft = numOfRows - pointsToPop; - - ASSERT(pointsLeft > 0); - - if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) { - ASSERT(pCol->len > 0); - VarDataOffsetT toffset = pCol->dataOff[pointsToPop]; - pCol->len = pCol->len - toffset; - ASSERT(pCol->len > 0); - memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len); - dataColSetOffset(pCol, pointsLeft); + if (IS_VAR_DATA_TYPE(pCol->type)) { + // set offset + pCol->dataOff[numOfRows] = pCol->len; + // Copy data + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); } else { ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); - pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; - memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); + pCol->len += pCol->bytes; } } bool isNEleNull(SDataCol *pCol, int nEle) { - switch (pCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - for (int i = 0; i < nEle; i++) { - if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; - } - return true; - default: - for (int i = 0; i < nEle; i++) { - if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; - } - return true; + for (int i = 0; i < nEle; i++) { + if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; } + return true; } void dataColSetNullAt(SDataCol *pCol, int index) { @@ -393,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { ASSERT(pDataCols->cols[i].dataOff != NULL); pRet->cols[i].dataOff = (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); @@ -403,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); - if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); } } @@ -463,21 +432,6 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) pCols->numOfRows++; } -// Pop pointsToPop points from the SDataCols -void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { - int pointsLeft = pCols->numOfRows - pointsToPop; - if (pointsLeft <= 0) { - tdResetDataCols(pCols); - return; - } - - for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SDataCol *pCol = pCols->cols + iCol; - dataColPopPoints(pCol, pointsToPop, pCols->numOfRows); - } - pCols->numOfRows = pointsLeft; -} - int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); @@ -549,9 +503,9 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i target->maxPoints); } } + target->numOfRows++; } - target->numOfRows++; (*iter2)++; if (key1 == key2) (*iter1)++; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 7adf16443e30dfde0ec991191563cf2b5c3865b8..2bdb95163d5b8077cc3228ae66e1e77a0562c075 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -105,8 +105,14 @@ int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { if (tSkipListPut(pTableData->pData, pRow) == NULL) { tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); } else { - // TODO: may need to refact here int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize; + if (isRowDelete) { + if (TABLE_LASTKEY(pTable) == key) { + // TODO: need to update table last key here (may from file) + } + } else { + if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; + } if ((!isRowDelete) && (TABLE_LASTKEY(pTable) < key)) TABLE_LASTKEY(pTable) = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; @@ -301,7 +307,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead * 4. operations in pCols not exceeds its max capacity if pCols is given * - * The function try to move as mush as possible. + * The function tries to procceed AS MUSH AS POSSIBLE. */ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { @@ -321,12 +327,13 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey row = tsdbNextIterRow(pIter); if (row == NULL || dataRowKey(row) > maxKey) { rowKey = INT64_MAX; + isRowDel = false; } else { rowKey = dataRowKey(row); isRowDel = dataRowDeleted(row); } - if (nFilterKeys == 0 || filterIter >= nFilterKeys) { + if (filterIter >= nFilterKeys) { fKey = INT64_MAX; } else { fKey = tdGetKey(filterKeys[filterIter]); @@ -362,6 +369,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey row = tsdbNextIterRow(pIter); if (row == NULL || dataRowKey(row) > maxKey) { rowKey = INT64_MAX; + isRowDel = false; } else { rowKey = dataRowKey(row); isRowDel = dataRowDeleted(row); @@ -391,6 +399,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey row = tsdbNextIterRow(pIter); if (row == NULL || dataRowKey(row) > maxKey) { rowKey = INT64_MAX; + isRowDel = false; } else { rowKey = dataRowKey(row); isRowDel = dataRowDeleted(row); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 2fff744b5cd83106d9bfe77ac34bc31bf96187a0..eb1193110d258f3f9c072698a32108585776d48a 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -1507,6 +1507,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (pDataCols->numOfRows + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; + pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, pCompBlock->keyFirst); + pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, pCompBlock->keyLast); if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, pMergeInfo) < 0) return -1; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; @@ -1553,6 +1555,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols * pDataCols0 = pHelper->pDataCols[0]; SMergeInfo mergeInfo = {0}; SMergeInfo *pMergeInfo = &mergeInfo; + SCompBlock oBlock = {0}; SSkipListIterator slIter = {0}; @@ -1562,14 +1565,14 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); ASSERT(pCompBlock != NULL); int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); + oBlock = *pCompBlock; - ASSERT((!TSDB_IS_LAST_BLOCK(pCompBlock)) || (tblkIdx == pIdx->numOfBlocks - 1)); + ASSERT((!TSDB_IS_LAST_BLOCK(&oBlock)) || (tblkIdx == pIdx->numOfBlocks - 1)); - if ((!TSDB_IS_LAST_BLOCK(pCompBlock)) && keyFirst < pCompBlock->keyFirst) { - // Loop to write data until pCompBlock->keyFirst-1 + if ((!TSDB_IS_LAST_BLOCK(&oBlock)) && keyFirst < pCompBlock->keyFirst) { while (true) { tdResetDataCols(pDataCols); - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, pCompBlock->keyLast - 1, defaultRowsInBlock, pDataCols, NULL, 0, + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, oBlock.keyFirst, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update, pMergeInfo); ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); if (pDataCols->numOfRows == 0) break; @@ -1582,7 +1585,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); } else { int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + if (tsdbLoadBlockDataCols(pHelper, &oBlock, NULL, &colId, 1) < 0) return -1; TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (blockAtIdx(pHelper, tblkIdx + 1)->keyFirst - 1); @@ -1595,19 +1598,19 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(pMergeInfo->rowsDeleteFailed >= 0); *(pCommitIter->pIter) = slIter; tblkIdx++; - } else if (pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { + } else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { // Delete the block and do some stuff ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1; *pCommitIter->pIter = slIter; - if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } else if (tsdbCheckAddSubBlockCond(pHelper, pCompBlock, pMergeInfo, pDataCols->maxPoints)) { + if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + } else if (tsdbCheckAddSubBlockCond(pHelper, &oBlock, pMergeInfo, pDataCols->maxPoints)) { // Append as a sub-block of the searched block tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, INT_MAX, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update, pMergeInfo); ASSERT(memcmp(pCommitIter->pIter, &slIter, sizeof(slIter)) == 0); - if (tsdbWriteBlockToFile(pHelper, pCompBlock->last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols, - &compBlock, pCompBlock->last, false) < 0) { + if (tsdbWriteBlockToFile(pHelper, oBlock.last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols, + &compBlock, oBlock.last, false) < 0) { return -1; } if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, pMergeInfo) < 0) { @@ -1616,7 +1619,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tblkIdx++; } else { // load the block data, merge with the memory data - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + if (tsdbLoadBlockData(pHelper, &oBlock, NULL) < 0) return -1; int round = 0; int dIter = 0; while (true) { @@ -1631,7 +1634,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, } if (round == 0) { - if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; + if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } else { if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 1c5b02a50284b6078394a50c23d9e3c84ce0f138..65b6482520c445fd65433300a5fcb569506c85e2 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -28,9 +28,9 @@ extern "C" { #define SKIP_LIST_RECORD_PERFORMANCE 0 // For key property setting -#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists -#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key -#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert +#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage) +#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key (for data update=0 case) +#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update=1 case) // For thread safety setting #define SL_THREAD_SAFE (uint8_t)0x4