未验证 提交 f4342a8b 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #15832 from taosdata/feature/TD-11274-3.0

enh: rsma fetch logic optimization
......@@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
......
......@@ -2658,7 +2658,6 @@ typedef struct {
} SVgEpSet;
typedef struct {
int64_t refId;
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
......@@ -2666,7 +2665,6 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
......@@ -2677,7 +2675,6 @@ static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFe
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
......
......@@ -200,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
......
......@@ -610,6 +610,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
......@@ -1874,21 +1874,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlocks
* @param pDataBlock
* @param vgId
* @param suid // TODO: check with Liao whether suid response is reasonable
* @param suid
*
* TODO: colId should be set
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
int32_t sz = 1;
for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
int32_t numOfCols = taosArrayGetSize(pDataBlocks);
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols));
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum));
bufSize += sizeof(SSubmitBlk);
}
......@@ -1905,7 +1904,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize;
......
......@@ -347,6 +347,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -187,6 +187,7 @@ int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma *pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
......@@ -36,19 +36,17 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputT
int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update
......@@ -604,11 +602,8 @@ _end:
return code;
}
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType) {
SArray *pResult = NULL;
SSma *pSma = pStat->pSma;
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, int8_t blkType) {
while (1) {
SSDataBlock *output = NULL;
uint64_t ts;
......@@ -619,30 +614,20 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p
pItem->level, terrstr(code));
goto _err;
}
if (!output) {
break;
}
if (!pResult) {
pResult = taosArrayInit(1, sizeof(SSDataBlock));
if (!pResult) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
taosArrayPush(pResult, output);
if (taosArrayGetSize(pResult) > 0) {
#if 1
if (output) {
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
SArray *pResult = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(pResult, output);
blockDebugShowDataBlocks(pResult, flag);
taosArrayDestroy(pResult);
#endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
......@@ -659,18 +644,17 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p
SMA_VID(pSma), suid, pItem->level, output->info.version);
taosMemoryFreeClear(pReq);
taosArrayClear(pResult);
} else if (terrno == 0) {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
break;
} else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err;
}
}
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_SUCCESS;
_err:
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_FAILED;
}
......@@ -694,11 +678,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return TSDB_CODE_FAILED;
}
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat,
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid,
STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
......@@ -724,11 +706,13 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SRSmaInfo *pRSmaInfo = NULL;
if (!pEnv) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
return NULL;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) {
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return NULL;
}
......@@ -743,12 +727,12 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return pRSmaInfo;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL;
......@@ -779,7 +763,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
ASSERT(!pCowRSmaInfo);
}
if(pCowRSmaInfo) {
if (pCowRSmaInfo) {
tdRefRSmaInfo(pSma, pCowRSmaInfo);
}
// unlock
......@@ -1323,7 +1307,7 @@ _err:
}
/**
* @brief trigger to get rsma result
* @brief trigger to get rsma result in async mode
*
* @param param
* @param tmrId
......@@ -1357,8 +1341,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
" refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
&pItem->tmrId);
taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
return;
}
......@@ -1372,16 +1355,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid);
// sync procedure => async process
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1];
qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat,
STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(taskInfo);
// async process
tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
......@@ -1404,3 +1379,118 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
_end:
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
}
/**
* @brief put rsma fetch msg to fetch queue
*
* @param pSma
* @param pInfo
* @param level
* @return int32_t
*/
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level};
int32_t ret = 0;
int32_t contLen = 0;
SEncoder encoder = {0};
tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret);
if (ret < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
goto _err;
}
void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead));
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen);
if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
}
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_VND_FETCH_RSMA,
.pCont = pBuf,
.contLen = contLen,
};
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s",
SMA_VID(pSma), pInfo->suid, level, terrstr());
goto _err;
}
smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma),
pInfo->suid, level);
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
/**
* @brief fetch rsma data of level 2/3 and submit
*
* @param pSma
* @param pMsg
* @return int32_t
*/
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
SRSmaFetchMsg req = {0};
SDecoder decoder = {0};
void *pBuf = NULL;
SRSmaInfo *pInfo = NULL;
SRSmaInfoItem *pItem = NULL;
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
return -1;
}
pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead));
tDecoderInit(&decoder, pBuf, pRpcMsg->contLen);
if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
}
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
if (!pInfo) {
if (terrno == TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_RSMA_EMPTY_INFO;
}
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
req.suid, req.level, terrstr());
goto _err;
}
pItem = RSMA_INFO_ITEM(pInfo, req.level - 1);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1);
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) {
goto _err;
}
tdCleanupStreamInputDataBlock(taskInfo);
tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level);
return TSDB_CODE_SUCCESS;
_err:
tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
......@@ -325,6 +325,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
......
......@@ -614,6 +614,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state"
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
......@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10);
sql insert into ct1 values(now+1s, 1);
sql insert into ct1 values(now+2s, 100);
print =============== wait maxdelay 15+1 seconds for results
sleep 16000
print =============== wait maxdelay 15+2 seconds for results
sleep 17000
print =============== select * from retention level 2 from memory
sql select * from ct1;
......
......@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10, 10.0);
sql insert into ct1 values(now+1s, 1, 1.0);
sql insert into ct1 values(now+2s, 100, 100.0);
print =============== wait maxdelay 5+1 seconds for results
sleep 6000
print =============== wait maxdelay 5+2 seconds for results
sleep 7000
print =============== select * from retention level 2 from memory
sql select * from ct1;
......@@ -135,8 +135,8 @@ print =============== insert after rsma qtaskinfo recovery
sql insert into ct1 values(now, 50, 500.0);
sql insert into ct1 values(now+1s, 40, 40.0);
print =============== wait maxdelay 5+1 seconds for results
sleep 6000
print =============== wait maxdelay 5+2 seconds for results
sleep 7000
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
sql select * from ct1;
......
......@@ -187,7 +187,7 @@ class TDTestCase:
tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="sum"):
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
tdLog.printNoPrefix("==========step: start insert data into tables now.....")
# from ...pytest.util.common import DataSet
data = DataSet()
data.get_order_set(rows)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册