diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index dea90811ad83206b05e29b0e2468b8057ac8d7a8..5608cfd6d16826e399f9ea41dc026d2ab2459610 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 311bebc788bf2ef66343bd44cfdd878d38a45d91..e56bae0d7e84953af493176a585ba3e7060b751b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -24,8 +24,10 @@ #include "twal.h" #include "tdataformat.h" #include "tglobal.h" +#include "tsync.h" #include "vnode.h" #include "dnodeInt.h" +#include "syncInt.h" #include "dnodeVWrite.h" #include "dnodeMgmt.h" @@ -239,6 +241,10 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->len = pWrite->contLen; dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); + } else if (type == TAOS_QTYPE_CQ) { + pHead = (SWalHead *)((char*)item + sizeof(SSyncHead)); + dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], + pHead->version); } else { pHead = (SWalHead *)item; dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 09cfa632fbfaaa77b32c09b49f051c0714b33564..de0cdb028b5f9231f57c78fa3411714c93de85c7 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 38f7c8e6056644591993027b410283b41fb8560c..169334c6119a35f53fd0a69dd1f7cb6952fbb727 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -62,6 +62,7 @@ typedef struct { } SVnodeObj; int vnodeWriteToQueue(void *param, void *pHead, int type); +int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type); void vnodeInitWriteFp(void); void vnodeInitReadFp(void); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0c1f9d103b6f355a77521a3f9c20372f24db29d0..a9bcf948b6cb00c25a0b283a0cb63126925abd8c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -259,7 +259,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToQueue; + cqCfg.cqWrite = vnodeWriteCqMsgToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 70b08b9669d0a66f3f06596305d10091e7c1d36a..c4924f312f4bf16b6c536d58dba75d92d1f75c85 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -22,9 +22,11 @@ #include "tutil.h" #include "tsdb.h" #include "twal.h" +#include "tsync.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" +#include "syncInt.h" #include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); @@ -189,6 +191,25 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } + +int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { + SVnodeObj *pVnode = param; + SWalHead * pHead = data; + + int size = sizeof(SWalHead) + pHead->len; + SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead)); + SWalHead *pWal = (SWalHead *)(pSync + 1); + memcpy(pWal, pHead, size); + + atomic_add_fetch_32(&pVnode->refCount, 1); + vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + + taosWriteQitem(pVnode->wqueue, type, pSync); + + return 0; +} + + int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; SWalHead * pHead = data;