diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index db6ca65cf05976a33e74a5d497df6a44aead5bc1..a0d30cb12be18b6e2c9ed20b908d68a3a6cabf30 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -266,7 +266,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index a4a185e5ee0895a17ec04a337512a3e45f58b33c..c69a864d3fa83e80fc7eef2d9fe6ed3d565f7e5a 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -19,7 +19,7 @@ #include "tlog.h" #include "tname.h" -#define MALLOC_ALIGN_BYTES 32 +#define MALLOC_ALIGN_BYTES 32 int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { ASSERT(pColumnInfoData != NULL); @@ -38,7 +38,8 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows; } else { - return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows); + return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + + BitmapLen(numOfRows); } } @@ -279,7 +280,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int pColumnInfoData->varmeta.allocLen = len + oldLen; } - if (pColumnInfoData->pData && pSource->pData) { // TD-20382 + if (pColumnInfoData->pData && pSource->pData) { // TD-20382 memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); } pColumnInfoData->varmeta.length = len + oldLen; @@ -1157,7 +1158,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } // todo temporarily disable it -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { +static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, + bool clearPayload) { ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/); if (numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; @@ -2009,9 +2011,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @param suid * */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, +#if 0 +int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { - int32_t bufSize = sizeof(SSubmitReq); + int32_t bufSize = sizeof(SSubmitReq2); int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { const SDataBlockInfo* pBlkInfo = &pDataBlock->info; @@ -2174,6 +2177,154 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB return TSDB_CODE_SUCCESS; } +#endif + +int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, + int32_t vgId, tb_uid_t suid) { + SSubmitReq2* pReq = *ppReq; + SArray* pVals = NULL; + int32_t bufSize = sizeof(SSubmitReq2); + int32_t numOfBlks = 0; + int32_t sz = 1; + + if (!(pReq = taosMemoryMalloc(bufSize))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + goto _end; + } + + for (int32_t i = 0; i < sz; ++i) { + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); + int32_t rows = pDataBlock->info.rows; + + if (colNum <= 1) { // invalid if only with TS col + continue; + } + + SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); + + if (!pTbData) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)); + pTbData->suid = suid; + pTbData->uid = pDataBlock->info.id.groupId; + pTbData->sver = pTSchema->version; + + if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + for (int32_t j = 0; j < rows; ++j) { // iterate by row + + taosArrayClear(pVals); + + bool isStartKey = false; + int32_t offset = 0; + for (int32_t k = 0; k < colNum; ++k) { // iterate by column + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + STColumn* pCol = &pTSchema->columns[k]; + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + ASSERT(pColInfoData->info.type == pCol->type); + if (!isStartKey) { + isStartKey = true; + SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, (SValue){.val = *(TSKEY*)var}); + taosArrayPush(pVals, &cv); + } else if (colDataIsNull_s(pColInfoData, j)) { + SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); + taosArrayPush(pVals, &cv); + } else { + SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, (SValue){.val = *(int64_t*)var}); + taosArrayPush(pVals, &cv); + } + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY + ASSERT(pColInfoData->info.type == pCol->type); + if (colDataIsNull_s(pColInfoData, j)) { + SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); + taosArrayPush(pVals, &cv); + } else { + void* data = colDataGetVarData(pColInfoData, j); + SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, sv); + taosArrayPush(pVals, &cv); + } + break; + } + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_MEDIUMBLOB: + uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); + ASSERT(0); + break; + default: + if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { + if (colDataIsNull_s(pColInfoData, j)) { + SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); // should use pCol->type + taosArrayPush(pVals, &cv); + } else { + SValue sv; + if (pCol->type == pColInfoData->info.type) { + memcpy(&sv.val, var, tDataTypes[pCol->type].bytes); + } else { + /** + * 1. sum/avg would convert to int64_t/uint64_t/double during aggregation + * 2. below conversion may lead to overflow or loss, the app should select the right data type. + */ + char tv[8] = {0}; + if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else { + uint64_t v = 0; + GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } + memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes); + } + SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, sv); + taosArrayPush(pVals, &cv); + } + } else { + uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); + ASSERT(0); + } + break; + } + } + SRow* pRow = NULL; + tRowBuild(pVals, pTSchema, &pRow); + if (pRow) { + taosArrayPush(pTbData->aRowP, &pRow); + } + } + } +_end: + if (terrno != 0) { + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { ASSERT(stbFullName[0] != 0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index eb11e4ac22629ddd73d2e732843421feae5c5897..bba960667dc1cd60c7276d8a28bd0fe0a59850fd 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -158,8 +158,8 @@ int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); -int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); -int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); +int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); @@ -220,7 +220,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); -int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); +int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6bd2ae3435349dfc3ab81838424cccc06ec4811e..ec91af8780d1724dc815d51472f45355daf12587 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -601,8 +601,8 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { return TSDB_CODE_FAILED; } - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - // TODO: spin lock for race conditiond + SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; + // spin lock for race condition during insert data if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { return TSDB_CODE_FAILED; } @@ -610,29 +610,19 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { return TSDB_CODE_SUCCESS; } -static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; +static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { + SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL; + int32_t size = taosArrayGetSize(pSubmitTbData); terrno = TSDB_CODE_SUCCESS; - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) { - return -1; - } - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) { + for (int32_t i = 0; i < size; ++i) { + SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i); + if (terrno = tdUidStorePut(pStore, pData->suid, NULL) < 0) { return -1; } - - if (!pBlock) break; - tdUidStorePut(pStore, msgIter.suid, NULL); } - if (terrno != TSDB_CODE_SUCCESS) { - return -1; - } return 0; } @@ -712,7 +702,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + SSubmitReq2 *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { @@ -730,10 +720,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma goto _err; } - smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64 - " len %" PRIu32, - SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version, - htonl(pReq->header.contLen)); + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); taosMemoryFreeClear(pReq); } @@ -754,21 +742,22 @@ _err: * * @param pSma * @param pMsg + * @param len * @param inputType * @param pInfo * @param suid * @return int32_t */ -static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, +static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - const SSubmitReq *pReq = (const SSubmitReq *)pMsg; + const SSubmitReq2 *pReq = (const SSubmitReq2 *)pMsg; - void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); + void *qItem = taosAllocateQitem(len, DEF_QITEM); if (!qItem) { return TSDB_CODE_FAILED; } - memcpy(qItem, pMsg, pReq->header.contLen); + memcpy(qItem, pMsg, len); taosWriteQitem(pInfo->queue, qItem); @@ -1015,7 +1004,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { * @param suid * @return int32_t */ -static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { +static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); @@ -1023,7 +1012,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) { + if (tdExecuteRSmaImplAsync(pSma, pMsg, len, inputType, pRSmaInfo, suid) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; } @@ -1045,7 +1034,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { +int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t len, int32_t inputType) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { // only applicable when rsma env exists @@ -1060,18 +1049,21 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { + smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); goto _err; } if (uidStore.suid != 0) { - if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) { + if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, uidStore.suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); goto _err; } void *pIter = NULL; while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { + if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, *pTbSuid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); goto _err; } } @@ -1081,7 +1073,6 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; _err: tdUidStoreDestory(&uidStore); - smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 0118d658c29e2356e6507e38c9a276a8faca83b5..0786fe71d018408e5014faf6ee936cc7fbbc190a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -26,14 +26,17 @@ static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 921 // static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); -int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) { - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - int32_t affectedrows = 0; - int32_t numOfRows = 0; +int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 *pRsp) { + int32_t arrSize = 0; + int32_t affectedrows = 0; + int32_t numOfRows = 0; ASSERT(pTsdb->mem != NULL); + if (pMsg) { + arrSize = taosArrayGetSize(pMsg->aSubmitTbData); + } + // scan and convert if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { @@ -43,22 +46,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * } // loop to insert - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) { - return -1; - } - while (true) { - SSubmitBlkRsp r = {0}; - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; -#if 0 - if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) { + for (int32_t i = 0; i < arrSize; ++i) { + if ((terrno = tsdbInsertTableData(pTsdb, version, taosArrayGet(pMsg->aSubmitTbData, i), &affectedrows)) < 0) { return -1; } -#else - ASSERT(0); -#endif - - numOfRows += msgIter.numOfRows; } if (pRsp != NULL) { @@ -86,9 +77,8 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow * } #endif -static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey, TSKEY now) { - TSKEY rowKey = TD_ROW_KEY(row); if (rowKey < minKey || rowKey > maxKey) { tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 " maxKey %" PRId64 " row key %" PRId64, @@ -100,6 +90,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro return 0; } +#if 0 int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; @@ -159,6 +150,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { } } #endif +// pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) { @@ -167,6 +159,46 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { } } + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} +#endif + +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { + ASSERT(pMsg != NULL); + STsdbKeepCfg *pCfg = &pTsdb->keepCfg; + TSKEY now = taosGetTimestamp(pCfg->precision); + TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; + TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; + int32_t size = taosArrayGetSize(pMsg->aSubmitTbData); + + terrno = TSDB_CODE_SUCCESS; + + for (int32_t i = 0; i < size; ++i) { + SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i); + if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData = TARRAY_SIZE(pData->aCol); + SColData *aColData = (SColData *)TARRAY_DATA(pData->aCol); + if (nColData > 0) { + int32_t nRows = aColData[0].nVal; + TSKEY *aKey = (TSKEY *)aColData[0].pData; + for (int32_t r = 0; r < nRows; ++r) { + if (tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now) < 0) { + return -1; + } + } + } + } else { + int32_t nRows = taosArrayGetSize(pData->aRowP); + for (int32_t r = 0; r < nRows; ++r) { + SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r); + if (tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now) < 0) { + return -1; + } + } + } + } + if (terrno != TSDB_CODE_SUCCESS) return -1; return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 27e35e89f78b085c02078f26ee880213e6ba993d..3c36a4e5345eefeb3e0b46a8298523b6407bcaf8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -236,7 +236,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TSDB */ case TDMT_VND_SUBMIT: - if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; + if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DELETE: if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -868,7 +868,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq // decode SDecoder dc = {0}; - tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead)); + tDecoderInit(&dc, pReq, len); if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; @@ -876,6 +876,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq tDecoderClear(&dc); // check + code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq); + if (code) { + goto _exit; + } + for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); @@ -982,6 +987,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); if (code == 0) { atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); + tdProcessRSmaSubmit(pVnode->pSma, pReq, len, STREAM_INPUT__DATA_SUBMIT); } // clear