未验证 提交 7c918b96 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16266 from taosdata/3.0

release: merge from 3.0 to main
......@@ -2667,31 +2667,6 @@ typedef struct {
int32_t padding;
} SRSmaExecMsg;
typedef struct {
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......
......@@ -201,7 +201,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
......
......@@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall);
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall);
int32_t taosGetQitem(STaosQall *qall, void **ppItem);
void taosResetQitems(STaosQall *qall);
int32_t taosQallItemSize(STaosQall *qall);
STaosQset *taosOpenQset();
void taosCloseQset(STaosQset *qset);
......
......@@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
if (req.rollup) {
req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
req.rsmaParam.watermark[0] = pStb->watermark[0];
req.rsmaParam.watermark[1] = pStb->watermark[1];
if (pStb->ast1Len > 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) {
......
......@@ -32,7 +32,8 @@ extern "C" {
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define RSMA_TASK_INFO_HASH_SLOT 8
#define RSMA_TASK_INFO_HASH_SLOT (8)
#define RSMA_EXECUTOR_MAX (1)
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaStat SSmaStat;
......@@ -90,14 +91,14 @@ struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
volatile int64_t qBufSize; // queue buffer size
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int8_t nExecutor; // [1, max(half of query threads, 4)]
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
int8_t execStat; // 0 not in exec , 1 in exec
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj *infoHash; // key: suid, value: SRSmaInfo
SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
tsem_t notEmpty; // has items in queue buffer
};
struct SSmaStat {
......@@ -111,26 +112,28 @@ struct SSmaStat {
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->infoHash)
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
int8_t level;
int8_t level : 4;
int8_t fetchLevel : 4;
int8_t triggerStat;
uint16_t interval; // second
int32_t maxDelay;
uint16_t nSkipped;
int32_t maxDelay; // ms
tmr_h tmrId;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
int64_t refId; // refId of SRSmaStat
uint64_t delFlag : 1;
uint64_t lastReceived : 63; // second
int64_t refId; // refId of SRSmaStat
int64_t lastRecv; // ms
int8_t assigned; // 0 idle, 1 assgined for exec
int8_t delFlag;
int16_t padding;
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
......
......@@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
int32_t smaInit();
void smaCleanUp();
int32_t smaOpen(SVnode* pVnode);
int32_t smaPreClose(SSma* pSma);
int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma);
......@@ -198,7 +199,6 @@ int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma* pSma, void* pMsg);
int32_t smaProcessExec(SSma* pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
......@@ -323,6 +323,7 @@ struct SVnode {
TdThreadMutex lock;
bool blocked;
bool restored;
bool inClose;
tsem_t syncSem;
SQHandle* pQuery;
};
......
......@@ -109,7 +109,7 @@ int32_t smaBegin(SSma *pSma) {
/**
* @brief pre-commit for rollup sma(sync commit).
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
* 2) wait all triggered fetch tasks finished
* 2) wait for all triggered fetch tasks to finish
* 3) perform persist task for qTaskInfo
*
* @param pSma
......@@ -127,14 +127,14 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
// step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
// step 2: wait all triggered fetch tasks finished
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
......@@ -316,15 +316,17 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1);
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait all triggered fetch tasks finished
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
......@@ -338,30 +340,29 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/
nLoops = 0;
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
return TSDB_CODE_FAILED;
}
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
if (old == 0) break;
if (++nLoops > 1000) {
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId());
nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
}
}
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
atomic_store_8(&pRSmaStat->execStat, 0);
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
......@@ -376,13 +377,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
atomic_store_64(&pRSmaStat->qBufSize, 0);
atomic_store_8(&pRSmaStat->execStat, 0);
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 5: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -398,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
if (!pSmaEnv) {
return TSDB_CODE_SUCCESS;
}
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
// perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -426,10 +424,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// step 1: merge qTaskInfo and iQTaskInfo
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
void *pIter = NULL;
while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
......@@ -447,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
continue;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else {
......@@ -463,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
#endif
}
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
......@@ -480,10 +476,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// TODO: remove suid in files?
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
......
......@@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
tsem_init(&pRSmaStat->notEmpty, 0, 0);
// init smaMgmt
smaInit();
......@@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (!RSMA_INFO_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_FETCH_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {
......@@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
// step 1: set rsma trigger stat cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
tsem_destroy(&(pStat->notEmpty));
// step 2: destroy the rsma info and associated fetch tasks
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
......@@ -279,17 +275,14 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: destroy the rsma fetch hash
taosHashCleanup(RSMA_FETCH_HASH(pStat));
// step 4: wait all triggered fetch tasks finished
// step 3: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
......
......@@ -146,6 +146,20 @@ int32_t smaClose(SSma *pSma) {
return 0;
}
int32_t smaPreClose(SSma *pSma) {
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
SSmaEnv *pEnv = NULL;
SRSmaStat *pStat = NULL;
if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) {
return 0;
}
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
tsem_post(&(pStat->notEmpty));
}
}
return 0;
}
/**
* @brief rsma env restore
*
......
......@@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (!pInfo->iTaskInfo[i]) {
continue;
}
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err;
}
......
......@@ -94,7 +94,7 @@ typedef struct SLastBlockReader {
SVersionRange verRange;
int32_t order;
uint64_t uid;
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
} SLastBlockReader;
typedef struct SFilesetIter {
......@@ -339,6 +339,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
pLReader->order = pReader->order;
pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange;
pLReader->currentBlockIndex = -1;
int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2326,15 +2327,17 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* p
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastBlocks = 0;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
taosArrayClear(pLastBlocks);
while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
if (!hasNext) { // no data files on disk
taosArrayClear(pLastBlocks);
break;
}
......@@ -2380,29 +2383,38 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return TSDB_CODE_SUCCESS;
}
// todo add elapsed time results
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) {
SArray* pBlocks = pLastBlockReader->pBlockL;
SBlockL* pBlock = NULL;
uint64_t uid = pBlockScanInfo->uid;
int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks);
initMemDataIterator(pBlockScanInfo, pReader);
pLastBlockReader->currentBlockIndex = -1;
// find the correct SBlockL
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
// find the correct SBlockL. todo binary search
int32_t index = -1;
for (int32_t i = 0; i < totalLastBlocks; ++i) {
SBlockL* p = taosArrayGet(pBlocks, i);
if (p->minUid <= uid && p->maxUid >= uid) {
pLastBlockReader->currentBlockIndex = i;
index = i;
pBlock = p;
break;
}
}
if (pLastBlockReader->currentBlockIndex == -1) {
if (index == -1) {
pLastBlockReader->currentBlockIndex = index;
tBlockDataReset(&pLastBlockReader->lastBlockData);
return TSDB_CODE_SUCCESS;
}
// the required last datablock has already loaded
if (index == pLastBlockReader->currentBlockIndex) {
return TSDB_CODE_SUCCESS;
}
int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
......@@ -2411,13 +2423,16 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError(
"%p error occurs in loading last block into buffer, last block index:%d, total:%d rows:%d, minVer:%" PRId64
", maxVer:%" PRId64 ", code:%s %s",
pReader, pLastBlockReader->currentBlockIndex, (int32_t)taosArrayGetSize(pBlocks), pBlock->nRow, pBlock->minVer,
pBlock->maxVer, tstrerror(code), pReader->idStr);
tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
} else {
tsdbDebug("%p load last block completed, uid:%" PRIu64
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64 " %s",
pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer,
pBlock->maxVer, pBlock->minKey, pBlock->maxKey, pReader->idStr);
}
pLastBlockReader->currentBlockIndex = index;
return TSDB_CODE_SUCCESS;
}
......@@ -2618,6 +2633,9 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
}
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
pLReader->currentBlockIndex = -1;
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
return code;
......
......@@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info
......@@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) {
}
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
if(smaAsyncPreCommit(pVnode->pSma) < 0){
ASSERT(0);
return -1;
}
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0);
......@@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) {
}
if (VND_IS_RSMA(pVnode)) {
smaAsyncCommit(pVnode->pSma);
if (smaAsyncCommit(pVnode->pSma) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
......@@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) {
// postCommit
// smaSyncPostCommit(pVnode->pSma);
smaAsyncPostCommit(pVnode->pSma);
if (smaAsyncPostCommit(pVnode->pSma) < 0) {
ASSERT(0);
return -1;
}
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
......
......@@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false;
pVnode->inClose = false;
tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1);
......@@ -181,6 +182,8 @@ _err:
void vnodePreClose(SVnode *pVnode) {
if (pVnode) {
syncLeaderTransfer(pVnode->sync);
pVnode->inClose = true;
smaPreClose(pVnode->pSma);
}
}
......
......@@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_EXEC_RSMA:
return smaProcessExec(pVnode->pSma, pMsg);
default:
......
......@@ -3137,6 +3137,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
initResultRow(resultRow);
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
}
if (offset != length) {
......
......@@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
return num;
}
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
STaosQset *taosOpenQset() {
STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
......
......@@ -35,6 +35,7 @@ sleep 7000
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
......@@ -51,6 +52,7 @@ endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
......@@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
......@@ -104,6 +107,7 @@ endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
......@@ -141,6 +145,7 @@ sleep 7000
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 2 file/mem rows $rows > 2
return -1
......@@ -163,6 +168,7 @@ endi
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows > 2 then
print retention level 1 file/mem rows $rows > 2
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册