diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 84b2c297d0e0cd3e35c2b62564300e0a9bd9c5b6..b1977fd5d95905d2a2d94b53e7555402ca5a01ab 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -40,15 +40,14 @@ typedef struct { int32_t vgId; + int32_t master; + int32_t num; // number of continuous streams char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; FCqWrite cqWrite; - void *ahandle; - int32_t num; // number of continuous streams struct SCqObj *pHead; void *dbConn; - int32_t master; void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; @@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { tstrncpy(pContext->db, db, sizeof(pContext->db)); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; - pContext->ahandle = ahandle; tscEmbedded = 1; pthread_mutex_init(&pContext->mutex, NULL); @@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pHead->version = 0; // write into vnode write queue - pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); + pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index e1114fc024054f7a4af32ca1497a63f7da942147..41380f0d86ed31b2466596993b8ce1a1d435f8cf 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -24,7 +24,7 @@ int64_t ver = 0; void *pCq = NULL; -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { +int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) { return 0; } diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 7a0727f1b8dd7cdea1a815c174b9f85004eedf87..afa744a9c4856521075fb9427d0ed1bb412aa7d5 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -21,7 +21,7 @@ extern "C" { #include "tdataformat.h" -typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); typedef struct { int32_t vgId; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3d3abde3c7dcb2cd503086aeb4cadf5e720572f4..0b29fcc9088195384d42318e0e71e69535571976 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToWQueue; + cqCfg.cqWrite = vnodeWriteToCache; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode);