diff --git a/include/util/tencode.h b/include/util/tencode.h index af38d694e2e0b505c69aa9e44e41e4c78718622d..cbacd59fa7873c4cb05b8fdaefb321ae3f854e5b 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -82,7 +82,7 @@ typedef struct { do { \ SEncoder coder = {0}; \ tEncoderInit(&coder, NULL, 0); \ - if ((E)(&coder, S) == 0) { \ + if ((E)(&coder, S) >= 0) { \ SIZE = coder.pos; \ RET = 0; \ } else { \ diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f89df4a96f17da92c1c60fa54b7474efb7b54a5d..40b490da47b08744d3d9e3c1457ebcd63db26411 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -20,9 +20,9 @@ #include "executor.h" #include "os.h" -#include "tcache.h" #include "thash.h" #include "tmsg.h" +#include "tqueue.h" #include "trpc.h" #include "ttimer.h" #include "wal.h" @@ -86,6 +86,9 @@ typedef struct { qTaskInfo_t task[5]; } STqExec; +int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec); +int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec); + struct STQ { char* path; SHashObj* pushMgr; // consumerId -> STqExec* @@ -93,7 +96,7 @@ struct STQ { SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; - // TDB* pTdb; + TDB* pTdb; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6ca523b58025a27596d88315262d9228a018f9eb..f7b4ba93a6bddf898778c01bb62b1c075a75c875 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,14 +14,25 @@ */ #include "tq.h" -#include "tqueue.h" int32_t tqInit() { - // + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); + if (old == 0) { + tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ"); + if (tqMgmt.timer == NULL) { + atomic_store_8(&tqMgmt.inited, 0); + return -1; + } + } return 0; } -void tqCleanUp() {} +void tqCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); + if (old != 1) return; + taosTmrCleanUp(tqMgmt.timer); + atomic_store_8(&tqMgmt.inited, 0); +} STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); @@ -32,9 +43,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; - /*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/ - /*ASSERT(0);*/ - /*}*/ + if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) { + ASSERT(0); + } pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -51,11 +62,45 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->execs); taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); + tdbClose(pTq->pTdb); taosMemoryFree(pTq); } // TODO } +int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1; + if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1; + if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1; + if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1; + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; + // TODO encode modified exec + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1; + if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1; + if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1; + if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1; + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; + // TODO decode modified exec + } + tEndDecode(pDecoder); + return 0; +} int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index ada1f599f231a9a9e2092fbc68637d13e33aa8ff..f2f423ddf53816a7c94cb396f993a8b5337c4f73 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -14,17 +14,17 @@ */ #define _DEFAULT_SOURCE -#include "tcompare.h" #include "os.h" #include "taoserror.h" +#include "tcompare.h" #include "tref.h" #include "walInt.h" typedef struct { - int8_t stop; - int8_t inited; - uint32_t seq; - int32_t refSetId; + int8_t stop; + int8_t inited; + uint32_t seq; + int32_t refSetId; TdThread thread; } SWalMgmt; @@ -53,13 +53,14 @@ int32_t walInit() { } void walCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); - if (old == 0) { + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 2); + if (old != 1) { return; } walStopThread(); taosCloseRef(tsWal.refSetId); wInfo("wal module is cleaned up"); + atomic_store_8(&tsWal.inited, 0); } SWal *walOpen(const char *path, SWalCfg *pCfg) { @@ -126,7 +127,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } if (walCheckAndRepairIdx(pWal) < 0) { - } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,