diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4454c061aef4e9f04087eace42a8bfcf25db0ec8..a283b7c9c1d04048a8b6cd84b3c489656e1c6dab 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2660,7 +2660,7 @@ typedef struct { } SVgEpSet; typedef struct { - int32_t padding; + // padding } SRSmaExecMsg; typedef struct { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6462c7afbfa80a48219d64cbed3797317f705c16..2bf840fd0149427e4b9ef95f16e9f69550af8809 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -201,6 +201,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3ca6978156bb99d40245bd89c09981786c3b8d46..2d41874912ec2dd716df60acb2daf743ad54b236 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -614,6 +614,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) +#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7c6807ab87220b8cbaecddfd9b0278c0b13aa0fe..1b4efeca7a64ea7f10db635d9519675502d08ff3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index fb8352d54398f934e2ff4cbc31ff463c827719c0..bc204e032d7b3a7f812d6a1bfbccff08eb88e743 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -57,9 +57,10 @@ typedef struct { void *tmrHandle; // shared by all fetch tasks } SSmaMgmt; -#define SMA_ENV_LOCK(env) (&(env)->lock) -#define SMA_ENV_TYPE(env) ((env)->type) -#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_ENV_LOCK(env) (&(env)->lock) +#define SMA_ENV_TYPE(env) ((env)->type) +#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv)) struct STSmaStat { int8_t state; // ETsdbSmaStat @@ -86,17 +87,19 @@ struct SQTaskFWriter { }; struct SRSmaStat { - SSma *pSma; - int64_t commitAppliedVer; // vnode applied version for async commit - int64_t refId; // shared by fetch tasks - SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) - int8_t triggerStat; // shared by fetch tasks - int8_t commitStat; // 0 not in committing, 1 in committing - SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; - SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash + SSma *pSma; + int64_t commitAppliedVer; // vnode applied version for async commit + int64_t refId; // shared by fetch tasks + volatile int64_t qBufSize; // queue buffer size + SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + int8_t triggerStat; // shared by fetch tasks + int8_t commitStat; // 0 not in committing, 1 in committing + int8_t execStat; // 0 not in exec , 1 in exec + SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; + struct SSmaStat { union { STSmaStat tsmaStat; // time-range-wise sma @@ -105,10 +108,9 @@ struct SSmaStat { T_REF_DECLARE() }; -#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) -#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) +#define SMA_STAT_TSMA(s) (&(s)->tsmaStat) +#define SMA_STAT_RSMA(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) -#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) @@ -122,23 +124,25 @@ struct SRSmaInfoItem { }; struct SRSmaInfo { - STSchema *pTSchema; - STaosQueue *queue; // buffer queue of SubmitReq - STaosQall *qall; - int64_t suid; - int64_t refId; // refId of SRSmaStat - int8_t delFlag; + STSchema *pTSchema; + int64_t suid; + int64_t refId; // refId of SRSmaStat + int8_t delFlag; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t - void *iTaskInfo[TSDB_RETENTION_L2]; // immutable + STaosQueue *queue; // buffer queue of SubmitReq + STaosQall *qall; // buffer qall of SubmitReq + void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t + STaosQueue *iQueue; // immutable buffer queue of SubmitReq + STaosQall *iQall; // immutable buffer qall of SubmitReq }; #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) -#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) +#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) #define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) enum { @@ -230,12 +234,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc); +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +int32_t tdRSmaProcessExecImpl(SSma *pSma); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1cc8e1bb27b8d91bfe59b4f0899244558d438ee9..f85a3d836183992a9efb5daf02476b804a955d6d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -188,6 +188,7 @@ int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaProcessFetch(SSma* pSma, void* pMsg); +int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 373cfdfb47a47ceaba6be6ca0e5e9531f61e2441..807c0334893c23d3c6e9b33c2bdd6618b1cfad7a 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); int8_t rsmaTriggerStat = atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); @@ -122,8 +121,8 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); // step 1: set rsma stat paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv)); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); // cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); @@ -314,7 +312,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -336,24 +334,30 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } - // step 3: swap rsmaInfoHash and iRsmaInfoHash + // step 3: consume the SubmitReq in buffer + if (tdRSmaProcessExecImpl(pSma) < 0) { + return TSDB_CODE_FAILED; + } + + // step 4: swap rsmaInfoHash and iRsmaInfoHash // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); - ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); - RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); - RSMA_INFO_HASH(pRSmaStat) = - taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - if (!RSMA_INFO_HASH(pRSmaStat)) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); - return TSDB_CODE_FAILED; + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + TSWAP(pInfo->iQall, pInfo->qall); + TSWAP(pInfo->iQueue, pInfo->queue); + TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]); + TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]); + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } + atomic_store_64(&pRSmaStat->qBufSize, 0); + // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -375,11 +379,9 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - // perform persist task for qTaskInfo - tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat)); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); return TSDB_CODE_SUCCESS; } @@ -396,65 +398,68 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SArray *rsmaDeleted = NULL; - // step 1: merge rsmaInfoHash and iRsmaInfoHash + // step 1: merge qTaskInfo and iQTaskInfo // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); -#if 0 - if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { - // just switch the hash pointer if rsmaInfoHash is empty - if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) { - SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat); - RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat); - RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash; - } - } else { -#endif -#if 1 - void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); + + void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - - if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - int32_t refVal = T_REF_VAL_GET(pRSmaInfo); - if (refVal == 0) { - tdFreeRSmaInfo(pSma, pRSmaInfo, true); - smaDebug( - "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " - "table:%" PRIi64, - SMA_VID(pSma), *pSuid); - } else { - smaDebug( - "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " - "table:%" PRIi64, - SMA_VID(pSma), refVal, *pSuid); + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + if(!rsmaDeleted) { + if((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))){ + taosArrayPush(rsmaDeleted, pSuid); + } } + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); + } + + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); + continue; + } - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); - continue; + if (pRSmaInfo->taskInfo[0]) { + if (pRSmaInfo->iTaskInfo[0]) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0]; + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + pRSmaInfo->iTaskInfo[0] = NULL; } - taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); - smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), - *pSuid); } else { - // free the resources - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - tdFreeRSmaInfo(pSma, pRSmaInfo, false); - smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), - *pSuid); + TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]); } - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); + + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } -#endif - // } - taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); - RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; + if (taosArrayGetSize(rsmaDeleted) > 0) { + for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { + tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i); + void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + smaDebug( + "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " + "table:%" PRIi64, + SMA_VID(pSma), *pSuid); + } + taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + } + // remove suid in files + taosArrayDestroy(rsmaDeleted); + } // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 50db1123ae112198249e243aa01b1ff3c55f766f..73f8060559181a849194865be3e6575d4bf24a1f 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -315,9 +315,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); + tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat)); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); + SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat; int32_t vid = SMA_VID(pRSmaStat->pSma); int64_t refId = RSMA_REF_ID(pRSmaStat); if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1660223d77b89b328518e39d0c29d517f4c4370f..41393eb52f3866a6929fbb323141002a0eac9729 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -17,6 +17,7 @@ #define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKEXEC_BUFSIZ 1 // * 1048576 // 8 MB SSmaMgmt smaMgmt = { .inited = 0, @@ -27,17 +28,18 @@ SSmaMgmt smaMgmt = { #define TD_RSMAINFO_DEL_FILE "rsmainfo.del" typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; +typedef struct SRSmaExecQItem SRSmaExecQItem; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, - int8_t level); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, + tb_uid_t suid, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, int8_t blkType); + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); @@ -76,6 +78,11 @@ struct SRSmaQTaskInfoIter { int32_t nBufPos; }; +struct SRSmaExecQItem { + void *pRSmaInfo; + void *qall; +}; + void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } @@ -143,8 +150,12 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (isDeepFree) { if (pInfo->queue) taosCloseQueue(pInfo->queue); if (pInfo->qall) taosFreeQall(pInfo->qall); + if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue); + if (pInfo->iQall) taosFreeQall(pInfo->iQall); pInfo->queue = NULL; pInfo->qall = NULL; + pInfo->iQueue = NULL; + pInfo->iQall = NULL; } taosMemoryFree(pInfo); @@ -362,9 +373,18 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (!(pRSmaInfo->queue = taosOpenQueue())) { goto _err; } + smaError("vgId:%d init bufSize:%" PRIi64 ", qMemSize:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pStat->qBufSize), + taosQueueMemorySize(pRSmaInfo->queue)); + if (!(pRSmaInfo->qall = taosAllocateQall())) { goto _err; } + if (!(pRSmaInfo->iQueue = taosOpenQueue())) { + goto _err; + } + if (!(pRSmaInfo->iQall = taosAllocateQall())) { + goto _err; + } pRSmaInfo->suid = suid; pRSmaInfo->refId = RSMA_REF_ID(pStat); T_REF_INIT_VAL(pRSmaInfo, 1); @@ -433,8 +453,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); @@ -619,7 +638,7 @@ _end: } static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, int8_t blkType) { + int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -690,8 +709,50 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, - int8_t level) { +/** + * @brief Copy msg to rsmaQueueBuffer + * + * @param pSma + * @param pMsg + * @param inputType + * @param pInfo + * @param suid + * @return int32_t + */ +static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, + tb_uid_t suid) { + const SSubmitReq *pReq = (const SSubmitReq *)pMsg; + + void *qItem = taosAllocateQitem(pReq->length, DEF_QITEM); + if (!qItem) { + return TSDB_CODE_FAILED; + } + + memcpy(qItem, pMsg, pReq->header.contLen); + + taosWriteQitem(pInfo->queue, qItem); + + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + int64_t size = atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); + smaError("vgId:%d originSize:%" PRIi64 ", after push size is:%" PRIi64, SMA_VID(pSma), size, + atomic_load_64(&pRSmaStat->qBufSize)); + return TSDB_CODE_SUCCESS; +} + +/** + * @brief sync mode + * + * @param pSma + * @param pMsg + * @param msgSize + * @param inputType + * @param pInfo + * @param suid + * @param level + * @return int32_t + */ +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, + tb_uid_t suid, int8_t level) { int32_t idx = level - 1; if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); @@ -705,14 +766,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), suid); - if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT + if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, msgSize, inputType) < 0) { // INPUT__DATA_SUBMIT smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, - STREAM_INPUT__DATA_SUBMIT); + tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { @@ -752,8 +812,15 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } + if (!pRSmaInfo->taskInfo[0]) { + if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return NULL; + } + } tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -762,41 +829,9 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat - SRSmaInfo *pCowRSmaInfo = NULL; - // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); - if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock - void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - if (iRSmaInfo) { - SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; - if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) { - if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); - return NULL; - } - smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); - if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); - return NULL; - } - } - } - } else { - pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; - ASSERT(!pCowRSmaInfo); - } - - if (pCowRSmaInfo) { - tdRefRSmaInfo(pSma, pCowRSmaInfo); - } // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - return pCowRSmaInfo; + return pRSmaInfo; } static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { @@ -805,10 +840,47 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { } } +/** + * @brief async mode + * + * @param pSma + * @param pMsg + * @param inputType + * @param suid + * @return int32_t + */ +static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { + SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); + if (!pRSmaInfo) { + smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); + return TSDB_CODE_SUCCESS; + } + + if (inputType == STREAM_INPUT__DATA_SUBMIT) { + if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); + return TSDB_CODE_FAILED; + } + } + + tdReleaseRSmaInfo(pSma, pRSmaInfo); + return TSDB_CODE_SUCCESS; +} + +#if 0 +/** + * @brief sync mode + * + * @param pSma + * @param pMsg + * @param inputType + * @param suid + * @return int32_t + */ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { - smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); + smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } @@ -820,6 +892,47 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } +#endif + +static int32_t tdRSmaExecCheck(SSma *pSma) { + SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma); + int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); + + if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { + smaError("vgId:%d, return directly as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, + bufSize); + return TSDB_CODE_SUCCESS; + } + smaError("vgId:%d, go on exec as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, bufSize); + + pRsmaStat->execStat = 1; + + SRSmaExecMsg fetchMsg; + int32_t contLen = sizeof(SMsgHead); + void *pBuf = rpcMallocCont(0 + contLen); + + ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); + ((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead); + + SRpcMsg rpcMsg = { + .code = 0, + .msgType = TDMT_VND_EXEC_RSMA, + .pCont = pBuf, + .contLen = contLen, + }; + + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { + smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr()); + goto _err; + } + + smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma)); + + return TSDB_CODE_SUCCESS; +_err: + pRsmaStat->execStat = 0; + return TSDB_CODE_FAILED; +} int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); @@ -839,16 +952,18 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { tdFetchSubmitReqSuids(pMsg, &uidStore); if (uidStore.suid != 0) { - tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid); + tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid); void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid); + tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid); pIter = taosHashIterate(uidStore.uidHash, pIter); } tdUidStoreDestory(&uidStore); + + tdRSmaExecCheck(pSma); } } return TSDB_CODE_SUCCESS; @@ -1282,7 +1397,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); + qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1452,7 +1567,7 @@ _end: * @param level * @return int32_t */ -int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { +static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; @@ -1479,7 +1594,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { .code = 0, .msgType = TDMT_VND_FETCH_RSMA, .pCont = pBuf, - .contLen = contLen, + .contLen = contLen + sizeof(SMsgHead), }; if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { @@ -1541,7 +1656,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) { + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { goto _err; } @@ -1558,3 +1673,125 @@ _err: smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } + +static void tdFreeRSmaSubmitItems(SArray *pItems) { + for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { + taosFreeQitem(*(void **)taosArrayGet(pItems, i)); + } +} + +int32_t tdRSmaProcessExecImpl(SSma *pSma) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SHashObj *infoHash = NULL; + SArray *pSubmitQArr = NULL; + SArray *pSubmitArr = NULL; + + if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) { + terrno = TSDB_CODE_RSMA_INVALID_STAT; + goto _err; + } + + taosRLockLatch(SMA_ENV_LOCK(pEnv)); + if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return TSDB_CODE_SUCCESS; + } + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + + if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (!(pSubmitArr = taosArrayInit(1024, POINTER_BYTES))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SRSmaExecQItem qItem = {0}; + taosWLockLatch(SMA_ENV_LOCK(pEnv)); + void *pIter = taosHashIterate(infoHash, NULL); + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->queue)) { + taosReadAllQitems(pInfo->queue, pInfo->qall); + qItem.qall = &pInfo->qall; + qItem.pRSmaInfo = pIter; + taosArrayPush(pSubmitQArr, &qItem); + } + ASSERT(taosQueueItemSize(pInfo->queue) == 0); + pIter = taosHashIterate(infoHash, pIter); + } + + atomic_store_64(&pRSmaStat->qBufSize, 0); + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + smaError("vgId:%d after exec qBufSize is:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pRSmaStat->qBufSize)); + + int32_t qSize = taosArrayGetSize(pSubmitQArr); + for (int32_t i = 0; i < qSize; ++i) { + SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i); + while (1) { + void *msg = NULL; + taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, *(SSubmitReq**)pSubmitArr->pData, size, STREAM_INPUT__DATA_SUBMIT, pInfo, pInfo->suid, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr); + taosArrayClear(pSubmitArr); + } + } + + taosArrayDestroy(pSubmitArr); + taosArrayDestroy(pSubmitQArr); + return TSDB_CODE_SUCCESS; +_err: + taosArrayDestroy(pSubmitArr); + taosArrayDestroy(pSubmitQArr); + return TSDB_CODE_FAILED; +} + +/** + * @brief exec rsma level 1data, fetch result of level 2/3 and submit + * + * @param pSma + * @param pMsg + * @return int32_t + */ +int32_t smaProcessExec(SSma *pSma, void *pMsg) { + SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; + SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma); + + if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { + terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; + goto _err; + } + + if (tdRSmaProcessExecImpl(pSma) < 0) { + goto _err; + } + + pRsmaStat->execStat = 0; + smaWarn("vgId:%d, success to process rsma exec msg", SMA_VID(pSma)); + return TSDB_CODE_SUCCESS; +_err: + pRsmaStat->execStat = 0; + smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; +} diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index f46d9dc29c0435dedf901feae1fcc06e0b3fa3b7..a6302b9235b57a3db5bcb371f81b81b0f5b95dbc 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } tdRefSmaStat(pSma, pStat); - pTsmaStat = SMA_TSMA_STAT(pStat); + pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index d9f38ffd090fffcb6cf110265f1d2929714da669..da7022248526d7371bf40a0e91c7a0f7d49b8a16 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -350,49 +350,45 @@ _err: } /** - * @brief pTSchema is shared + * @brief Clone qTaskInfo of SRSmaInfo * * @param pSma - * @param pDest - * @param pSrc + * @param pInfo * @return int32_t */ -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) { - SVnode *pVnode = pSma->pVnode; +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { SRSmaParam *param = NULL; - if (!pSrc) { - *pDest = NULL; + if (!pInfo) { return TSDB_CODE_SUCCESS; } SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); - smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); - if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) { - smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, + smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); + if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) { + smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); goto _err; } ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == pSrc->suid); + ASSERT(mr.me.uid == pInfo->suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) { + if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { goto _err; } } - smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); + smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); + } else { + terrno = TSDB_CODE_RSMA_INVALID_SCHEMA; + goto _err; } metaReaderClear(&mr); - - *pDest = pSrc; // pointer copy - return TSDB_CODE_SUCCESS; _err: - *pDest = NULL; metaReaderClear(&mr); - smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr()); + smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); return TSDB_CODE_FAILED; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3a25933ec453932e7bd5267a193d45a081c766c6..751cb21d089f89278e28b1bec8c9c1284d8a8e66 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -295,6 +295,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_VND_FETCH_RSMA: return smaProcessFetch(pVnode->pSma, pMsg); + case TDMT_VND_EXEC_RSMA: + return smaProcessExec(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7115ad85a50884d21f496a900d21f0e954c07ea0..4c3d5cf7affbc291bcc9f62adcff688e149a414a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(pInfo->pBlockLists); if (type == STREAM_INPUT__MERGED_SUBMIT) { - ASSERT(numOfBlocks > 1); + // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); taosArrayPush(pInfo->pBlockLists, &pReq); diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index dbb50f958d5b01e7756a57772c0313aab2b7a43a..6b2edf0d5e6e1f41b5d354d110fb23892a864b33 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -15,6 +15,7 @@ #include "tsimplehash.h" #include "taoserror.h" +#include "tlog.h" #define SHASH_DEFAULT_LOAD_FACTOR 0.75 #define HASH_MAX_CAPACITY (1024 * 1024 * 16) @@ -106,27 +107,27 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data return pNewNode; } -static void taosHashTableResize(SSHashObj *pHashObj) { +static void tSimpleHashTableResize(SSHashObj *pHashObj) { if (!SHASH_NEED_RESIZE(pHashObj)) { return; } int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u); if (newCapacity > HASH_MAX_CAPACITY) { - // uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached", - // pHashObj->capacity, HASH_MAX_CAPACITY); + uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached", + pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); if (!pNewEntryList) { - // qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); + uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; } size_t inc = newCapacity - pHashObj->capacity; - memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc); + memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *)); pHashObj->hashList = pNewEntryList; pHashObj->capacity = newCapacity; @@ -179,7 +180,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons // need the resize process, write lock applied if (SHASH_NEED_RESIZE(pHashObj)) { - taosHashTableResize(pHashObj); + tSimpleHashTableResize(pHashObj); } int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6e3067d44edc67f10944cdde2ffc72fbd4b57fea..3f6d3421ec5bbf38e76ecd880aee8d4d01633eb5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -616,6 +616,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")