提交 a63559cf 编写于 作者: H Hongze Cheng

refactor: sync

上级 167657c1
...@@ -38,11 +38,11 @@ typedef struct SRpcConnInfo { ...@@ -38,11 +38,11 @@ typedef struct SRpcConnInfo {
typedef struct SRpcMsg { typedef struct SRpcMsg {
tmsg_t msgType; tmsg_t msgType;
void * pCont; void *pCont;
int contLen; int contLen;
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void *handle; // rpc handle returned to app
void * ahandle; // app handle set by client void *ahandle; // app handle set by client
int noResp; // has response or not(default 0, 0: resp, 1: no resp); int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not int persistHandle; // persist handle or not
...@@ -63,7 +63,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha ...@@ -63,7 +63,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
...@@ -80,36 +80,37 @@ typedef struct SRpcInit { ...@@ -80,36 +80,37 @@ typedef struct SRpcInit {
RpcCfp cfp; RpcCfp cfp;
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
RpcAfp afp;; RpcAfp afp;
;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
typedef struct { typedef struct {
void *val; void *val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
int32_t msgType; int32_t msgType;
void *val; void *val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal; } SRpcBrokenlinkVal;
typedef struct { typedef struct {
SHashObj * args; SHashObj *args;
SRpcBrokenlinkVal brokenVal; SRpcBrokenlinkVal brokenVal;
} SRpcCtx; } SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup(); void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void * rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
// Because taosd supports multi-process mode // Because taosd supports multi-process mode
// These functions should not be used on the server side // These functions should not be used on the server side
......
...@@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
} }
#if 1 #if 0
int64_t version; int64_t version;
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
...@@ -180,8 +180,10 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -180,8 +180,10 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
SRpcMsg rsp; SRpcMsg rsp;
static int64_t version = 0;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
#if 0 #if 1
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
...@@ -191,7 +193,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -191,7 +193,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.pCont = NULL; rsp.pCont = NULL;
rsp.contLen = 0; rsp.contLen = 0;
if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &rsp) < 0) { if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, version++, &rsp) < 0) {
rsp.code = terrno; rsp.code = terrno;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册