From a63559cfef3d196fcd70885efe9517c12cb2a14b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Apr 2022 09:11:40 +0000 Subject: [PATCH] refactor: sync --- include/libs/transport/trpc.h | 29 +++++++++++---------- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 8 +++--- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 3e2f596784..614f358ac2 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -38,11 +38,11 @@ typedef struct SRpcConnInfo { typedef struct SRpcMsg { tmsg_t msgType; - void * pCont; + void *pCont; int contLen; int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client + void *handle; // rpc handle returned to app + void *ahandle; // app handle set by client int noResp; // has response or not(default 0, 0: resp, 1: no resp); int persistHandle; // persist handle or not @@ -63,7 +63,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha typedef struct SRpcInit { uint16_t localPort; // local port - char * label; // for debug purpose + char *label; // for debug purpose int numOfThreads; // number of threads to handle connections int sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS @@ -80,36 +80,37 @@ typedef struct SRpcInit { RpcCfp cfp; // call back to retrieve the client auth info, for server app only - RpcAfp afp;; + RpcAfp afp; + ; void *parent; } SRpcInit; typedef struct { - void *val; + void *val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcCtxVal; typedef struct { - int32_t msgType; - void *val; + int32_t msgType; + void *val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcBrokenlinkVal; typedef struct { - SHashObj * args; + SHashObj *args; SRpcBrokenlinkVal brokenVal; } SRpcCtx; int32_t rpcInit(); void rpcCleanup(); -void * rpcOpen(const SRpcInit *pRpc); +void *rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); -void * rpcMallocCont(int contLen); +void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); -void * rpcReallocCont(void *ptr, int contLen); +void *rpcReallocCont(void *ptr, int contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 17347eb59a..d15f97b042 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } -#if 1 +#if 0 int64_t version; vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); @@ -180,8 +180,10 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SNodeMsg *pMsg = NULL; SRpcMsg rsp; + static int64_t version = 0; + for (int32_t i = 0; i < numOfMsgs; ++i) { -#if 0 +#if 1 taosGetQitem(qall, (void **)&pMsg); if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { @@ -191,7 +193,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.pCont = NULL; rsp.contLen = 0; - if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &rsp) < 0) { + if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, version++, &rsp) < 0) { rsp.code = terrno; tmsgSendRsp(&rsp); } -- GitLab