diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 1a99a84b8ebfe8ed503213299646da39b5b1d27a..9efa517ac33b2d241dbe605c2386378203099a75 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pHead->version = 0; // write into vnode write queue - pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); + pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index 1daee644a7effc5fa50fbc340ad7df8e2b5d70b0..e1114fc024054f7a4af32ca1497a63f7da942147 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -24,7 +24,7 @@ int64_t ver = 0; void *pCq = NULL; -int writeToQueue(void *pVnode, void *data, int type) { +int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { return 0; } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 3a820f180c73b856a4693d668f91d675deb5b75b..aaf1098d40ad04a1e3e31ca3e0d23b249f6f9b6a 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -31,14 +31,6 @@ typedef struct { pthread_t thread; // thread } SWriteWorker; -typedef struct { - SRspRet rspRet; - SRpcMsg rpcMsg; - int32_t processedCount; - int32_t code; - int32_t contLen; - void * pCont; -} SWriteMsg; typedef struct { int32_t max; // max number of workers @@ -86,39 +78,38 @@ void dnodeCleanupVWrite() { dInfo("dnode vwrite is closed"); } -void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) { - char *pCont = pMsg->pCont; +void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { + int32_t code; + char *pCont = pRpcMsg->pCont; - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + if (pRpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { SMsgDesc *pDesc = (SMsgDesc *)pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pCont += sizeof(SMsgDesc); } - SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->contLen); + SMsgHead *pMsg = (SMsgHead *)pCont; + pMsg->vgId = htonl(pMsg->vgId); + pMsg->contLen = htonl(pMsg->contLen); - taos_queue queue = vnodeAcquireWqueue(pHead->vgId); - if (queue) { - // put message into queue - SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg)); - pWrite->rpcMsg = *pMsg; - pWrite->pCont = pCont; - pWrite->contLen = pHead->contLen; - - taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); + void *pVnode = vnodeAcquire(pMsg->vgId); + if (pVnode == NULL) { + code = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { - SRpcMsg rpcRsp = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0, - .code = TSDB_CODE_VND_INVALID_VGROUP_ID, - .msgType = 0 - }; + SWalHead *pHead = (SWalHead *)(pCont - sizeof(SWalHead)); + pHead->msgType = pRpcMsg->msgType; + pHead->version = 0; + pHead->len = pMsg->contLen; + code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg); + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = code}; rpcSendResponse(&rpcRsp); - rpcFreeCont(pMsg->pCont); } + + vnodeRelease(pVnode); + rpcFreeCont(pRpcMsg->pCont); } void *dnodeAllocVWriteQueue(void *pVnode) { @@ -179,7 +170,7 @@ void dnodeFreeVWriteQueue(void *wqueue) { void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { if (param == NULL) return; - SWriteMsg *pWrite = param; + SVWriteMsg *pWrite = param; if (code < 0) pWrite->code = code; int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); @@ -187,26 +178,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { if (count <= 1) return; SRpcMsg rpcRsp = { - .handle = pWrite->rpcMsg.handle, + .handle = pWrite->rpcHandle, .pCont = pWrite->rspRet.rsp, .contLen = pWrite->rspRet.len, .code = pWrite->code, }; rpcSendResponse(&rpcRsp); - rpcFreeCont(pWrite->rpcMsg.pCont); taosFreeQitem(pWrite); vnodeRelease(pVnode); } static void *dnodeProcessWriteQueue(void *param) { - SWriteWorker *pWorker = (SWriteWorker *)param; - SWriteMsg * pWrite; - SWalHead * pHead; - SRspRet * pRspRet; + SWriteWorker *pWorker = param; + SVWriteMsg * pWrite; void * pVnode; - void * pItem; int32_t numOfMsgs; int32_t qtype; @@ -220,36 +207,14 @@ static void *dnodeProcessWriteQueue(void *param) { } for (int32_t i = 0; i < numOfMsgs; ++i) { - pWrite = NULL; - pRspRet = NULL; - taosGetQitem(pWorker->qall, &qtype, &pItem); - if (qtype == TAOS_QTYPE_RPC) { - pWrite = pItem; - pRspRet = &pWrite->rspRet; - pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead)); - pHead->msgType = pWrite->rpcMsg.msgType; - pHead->version = 0; - pHead->len = pWrite->contLen; - dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, - taosMsg[pWrite->rpcMsg.msgType]); - } else if (qtype == TAOS_QTYPE_CQ) { - pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead)); - dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], - pHead->version); - } else { - pHead = pItem; - dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], - pHead->version); - } + taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); + dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, + taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version); - int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet); - dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], - pHead->version, tstrerror(code)); + pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite); + if (pWrite->code <= 0) pWrite->processedCount = 1; - if (pWrite) { - pWrite->rpcMsg.code = code; - if (code <= 0) pWrite->processedCount = 1; - } + dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); } walFsync(vnodeGetWal(pVnode)); @@ -257,17 +222,15 @@ static void *dnodeProcessWriteQueue(void *param) { // browse all items, and process them one by one taosResetQitems(pWorker->qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(pWorker->qall, &qtype, &pItem); + taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); if (qtype == TAOS_QTYPE_RPC) { - pWrite = pItem; - dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code); + dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); } else if (qtype == TAOS_QTYPE_FWD) { - pHead = pItem; - vnodeConfirmForward(pVnode, pHead->version, 0); - taosFreeQitem(pItem); + vnodeConfirmForward(pVnode, pWrite->pHead->version, 0); + taosFreeQitem(pWrite); vnodeRelease(pVnode); } else { - taosFreeQitem(pItem); + taosFreeQitem(pWrite); vnodeRelease(pVnode); } } diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 32b75674c3278b3273fd4b98dd645f4168543155..4a23695a1a851d861d23da6142c6bd909fa73d7c 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -21,7 +21,7 @@ extern "C" { #include "tdataformat.h" -typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); +typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef struct { int vgId; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 671adefab83fed42f7bc3fb0ad9591050f118e73..ca11e0ae0bbfe56c2aad7fb91e280ba056f4209a 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -70,8 +70,8 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId); -// when a forward pkt is received, call this to handle data -typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); +// when a forward pkt is received, call this to handle data +typedef int32_t (*FWriteToCache)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); // when forward is confirmed by peer, master call this API to notify app typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); diff --git a/src/inc/twal.h b/src/inc/twal.h index 3a229ed8350624e1c69772abbe9b2f30ac232b80..d9b5fb26ffd388ae1cd0540ae7286b126391cd37 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -43,8 +43,8 @@ typedef struct { int8_t keep; // keep the wal file when closed } SWalCfg; -typedef void* twalh; // WAL HANDLE -typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); +typedef void * twalh; // WAL HANDLE +typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); int32_t walInit(); void walCleanUp(); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 8c387065ccd8c8502df35ef2f576ad738049a421..670f4f599c17d120a641a985a04b126721e27468 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "twal.h" + typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_READY, @@ -29,9 +31,9 @@ typedef enum _VN_STATUS { } EVnStatus; typedef struct { - int len; - void *rsp; - void *qhandle; //used by query and retrieve msg + int32_t len; + void * rsp; + void * qhandle; // used by query and retrieve msg } SRspRet; typedef struct { @@ -41,6 +43,16 @@ typedef struct { SRpcMsg rpcMsg; } SReadMsg; +typedef struct { + int32_t code; + int32_t processedCount; + void * rpcHandle; + void * rpcAhandle; + SRspRet rspRet; + char reserveForSync[16]; + SWalHead pHead[]; +} SVWriteMsg; + extern char *vnodeStatus[]; int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); @@ -55,7 +67,9 @@ void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); -int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); + +int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); +int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); void vnodeBuildStatusMsg(void *param); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 6f5e3be8ab4965331f23d2e2b7307823981205a5..a504ded657e23210d0dc066158d8801f07a8567e 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 19a5d3ba41b8fb0752f5849b8504059accdae485..9216567bc5e798f61da380c30c0409fe585353bc 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) { if (ret < 0) break; sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL); + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); } if (code < 0) { @@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)offset; - (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); + (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL); offset += pHead->len + sizeof(SWalHead); return offset; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 169334c6119a35f53fd0a69dd1f7cb6952fbb727..935da5436778c3aea2beb71acd75e11fe4965dcb 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -61,8 +61,6 @@ typedef struct { char db[TSDB_DB_NAME_LEN]; } SVnodeObj; -int vnodeWriteToQueue(void *param, void *pHead, int type); -int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type); void vnodeInitWriteFp(void); void vnodeInitReadFp(void); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e2069331168b57af64155b968453a3e4eb120056..7cbbf0feb86f2e99f058b4d5401109f0d30443d0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteCqMsgToQueue; + cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); @@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) { } void vnodeRelease(void *pVnodeRaw) { + if (pVnodeRaw == NULL) return; SVnodeObj *pVnode = pVnodeRaw; int32_t vgId = pVnode->vgId; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index c2771c73c695244ff78fe3293dc7a91f940fed6f..4432f98db00b733ac5af079d2fed874c224bc40c 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -46,10 +46,10 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } -int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { +int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { int32_t code = 0; - SVnodeObj *pVnode = (SVnodeObj *)param1; - SWalHead * pHead = param2; + SVnodeObj *pVnode = param; + SWalHead * pHead = pWrite->pHead; if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); @@ -80,7 +80,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); + syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype); if (syncCode < 0) return syncCode; // write into WAL @@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { pVnode->version = pHead->version; // write data locally - code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); + code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet); if (code < 0) return code; return syncCode; @@ -204,35 +204,31 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } -int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { - SVnodeObj *pVnode = param; - SWalHead * pHead = data; - - int size = sizeof(SWalHead) + pHead->len; - SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead)); - SWalHead *pWal = (SWalHead *)(pSync + 1); - memcpy(pWal, pHead, size); +int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) { + SVnodeObj *pVnode = vparam; + SWalHead * pHead = wparam; - atomic_add_fetch_32(&pVnode->refCount, 1); - vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); - - taosWriteQitem(pVnode->wqueue, type, pSync); + if (qtype == TAOS_QTYPE_RPC && vnodeCheckWrite(pVnode) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } - return 0; -} + int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; + SVWriteMsg *pWrite = taosAllocateQitem(size); + if (pWrite == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } -int vnodeWriteToQueue(void *param, void *data, int type) { - SVnodeObj *pVnode = param; - SWalHead * pHead = data; + if (pMsg != NULL) { + SRpcMsg *pRpcMsg = pMsg; + pWrite->rpcHandle = pRpcMsg->handle; + pWrite->rpcAhandle = pRpcMsg->ahandle; + } - int size = sizeof(SWalHead) + pHead->len; - SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); - memcpy(pWal, pHead, size); + memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); atomic_add_fetch_32(&pVnode->refCount, 1); vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); - taosWriteQitem(pVnode->wqueue, type, pWal); - - return 0; + taosWriteQitem(pVnode->wqueue, qtype, pWrite); + return TSDB_CODE_SUCCESS; } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index e57cb0e042f321ed4d4d3c463a84e6917d4e0fc7..d85f740597ad2a5c38bbe8564c6835136d0c3d66 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -122,7 +122,7 @@ void walFsync(void *handle) { } } -int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { +int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { if (handle == NULL) return -1; SWal * pWal = handle; @@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (pWal->keep) pWal->version = pHead->version; - (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); + (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); } tclose(fd); diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index 186f2ef5ffe9fed68f9c9205b251cbb3e10dfeab..14e439c0727508c603e5e0ed0a5b2eba39ab631d 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -23,7 +23,7 @@ int64_t ver = 0; void *pWal = NULL; -int writeToQueue(void *pVnode, void *data, int type) { +int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { // do nothing SWalHead *pHead = data;