From a9c54ae70155a6fc3ac5e7b1bc9ee7a1e4c63b45 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 25 May 2022 15:54:49 +0800 Subject: [PATCH] enh(tmq): support restart --- source/dnode/vnode/src/tq/tq.c | 123 ++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 979db9169e..0e8835357a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -51,6 +51,47 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_ return strcmp(pKey1, pKey2); } +int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) { + int32_t code; + int32_t vlen; + tEncodeSize(tEncodeSTqExec, pExec, vlen, code); + ASSERT(code == 0); + + void* buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + ASSERT(0); + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); + + if (tEncodeSTqExec(&encoder, pExec) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { + ASSERT(0); + } + + if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + tEncoderClear(&encoder); + taosMemoryFree(buf); + return 0; +} + STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { @@ -96,8 +137,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { int vLen; tdbTbcMoveToFirst(pCur); + SDecoder decoder; while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - // create, put into execsj + STqExec exec; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqExec(&decoder, &exec); + exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal); + if (exec.subType == TOPIC_SUB_TYPE__TABLE) { + for (int32_t i = 0; i < 5; i++) { + exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + + SReadHandle handle = { + .reader = exec.pExecReader[i], + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + }; + exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle); + ASSERT(exec.task[i]); + } + } else { + for (int32_t i = 0; i < 5; i++) { + exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + } + exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + } + taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec)); } if (tdbTxnClose(&txn) < 0) { @@ -604,7 +668,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(0); } - tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn); + if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) { + /*ASSERT(0);*/ + } if (tdbCommit(pTq->pMetaStore, &txn) < 0) { ASSERT(0); @@ -659,60 +725,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); - int32_t code; - int32_t vlen; - tEncodeSize(tEncodeSTqExec, pExec, vlen, code); - ASSERT(code == 0); - - void* buf = taosMemoryCalloc(1, vlen); - if (buf == NULL) { - ASSERT(0); - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, vlen); - - if (tEncodeSTqExec(&encoder, pExec) < 0) { - ASSERT(0); + if (tqStoreExec(pTq, req.subKey, pExec) < 0) { + // TODO } - - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) { - ASSERT(0); - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - tEncoderClear(&encoder); - taosMemoryFree(buf); - return 0; } else { - /*if (req.newConsumerId != -1) {*/ - /*taosWLockLatch(&pExec->lock);*/ - ASSERT(pExec->consumerId == req.oldConsumerId); + /*ASSERT(pExec->consumerId == req.oldConsumerId);*/ // TODO handle qmsg and exec modification atomic_store_32(&pExec->epoch, -1); atomic_store_64(&pExec->consumerId, req.newConsumerId); atomic_add_fetch_32(&pExec->epoch, 1); - /*taosWUnLockLatch(&pExec->lock);*/ + + if (tqStoreExec(pTq, req.subKey, pExec) < 0) { + // TODO + } return 0; - /*} else {*/ - // TODO - /*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/ - /*return 0;*/ - /*}*/ } } -- GitLab