diff --git a/src/inc/trpc.h b/src/inc/trpc.h index a34d10747441757cb29bf07963702dc1585d9b40..5845823b380f5cd93fe472ee9198ffc3acdf9972 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -76,16 +76,16 @@ typedef struct { int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; -void *rpcOpen(SRpcInit *pRpc); +void *rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); -void rpcSendResponse(SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); +void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp); +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index cdc92b236643616c65911417f1b24c342451946d..bc28831ecca4ef14906106f74dc251c055164a33 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -55,6 +55,24 @@ typedef struct { int role[TAOS_SYNC_MAX_REPLICA]; } SNodesRole; +// if name is null, get the file from index or after, used by master +// if name is provided, get the named file at the specified index, used by unsynced node +// it returns the file magic number and size, if file not there, magic shall be 0. +typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); + +// get the wal file from index or after +// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file +typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index); + +// when a forward pkt is received, call this to handle data +typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); + +// when forward is confirmed by peer, master call this API to notify app +typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); + +// when role is changed, call this to notify app +typedef void (*FNotifyRole)(void *ahandle, int8_t role); + typedef struct { int32_t vgId; // vgroup ID uint64_t version; // initial version @@ -62,31 +80,19 @@ typedef struct { char path[128]; // path to the file void *ahandle; // handle provided by APP + FGetFileInfo getFileInfo; + FGetWalInfo getWalInfo; + FWriteToCache writeToCache; + FConfirmForward confirmForward; + FNotifyRole notifyRole; - // if name is null, get the file from index or after, used by master - // if name is provided, get the named file at the specified index, used by unsynced node - // it returns the file magic number and size, if file not there, magic shall be 0. - uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); - - // get the wal file from index or after - // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file - int (*getWalInfo)(void *ahandle, char *name, uint32_t *index); - - // when a forward pkt is received, call this to handle data - int (*writeToCache)(void *ahandle, void *pHead, int type); - - // when forward is confirmed by peer, master call this API to notify app - void (*confirmForward)(void *ahandle, void *mhandle, int32_t code); - - // when role is changed, call this to notify app - void (*notifyRole)(void *ahandle, int8_t role); } SSyncInfo; typedef void* tsync_h; -tsync_h syncStart(SSyncInfo *); +tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); -int syncReconfig(tsync_h shandle, SSyncCfg *); +int syncReconfig(tsync_h shandle, const SSyncCfg *); int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncRecover(tsync_h shandle); // recover from other nodes: diff --git a/src/inc/twal.h b/src/inc/twal.h index 53d4f835b0e121e4af1ca71ef02bad99e4e43b0d..e90e2e54ce7e7ea6d088379f5c187b26c847ff27 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -38,15 +38,16 @@ typedef struct { int8_t wals; // number of WAL files; } SWalCfg; -typedef void* twal_h; // WAL HANDLE - -twal_h walOpen(char *path, SWalCfg *pCfg); -void walClose(twal_h); -int walRenew(twal_h); -int walWrite(twal_h, SWalHead *); -void walFsync(twal_h); -int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type)); -int walGetWalFile(twal_h, char *name, uint32_t *index); +typedef void* twalh; // WAL HANDLE +typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); + +twalh walOpen(const char *path, const SWalCfg *pCfg); +void walClose(twalh); +int walRenew(twalh); +int walWrite(twalh, SWalHead *); +void walFsync(twalh); +int walRestore(twalh, void *pVnode, FWalWrite writeFp); +int walGetWalFile(twalh, char *name, uint32_t *index); extern int wDebugFlag; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f1c6deb1d07458585ebb8a069fc182f7b53e1312..bec9621e3b417196a205cbba39a796cac0d18298 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -202,7 +202,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); static void rpcLockConn(SRpcConn *pConn); static void rpcUnlockConn(SRpcConn *pConn); -void *rpcOpen(SRpcInit *pInit) { +void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; @@ -344,22 +344,22 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { +void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; - pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext->ahandle = pMsg->handle; pContext->pRpc = (SRpcInfo *)shandle; pContext->ipSet = *pIpSet; - pContext->contLen = pMsg->contLen; + pContext->contLen = contLen; pContext->pCont = pMsg->pCont; pContext->msgType = pMsg->msgType; pContext->oldInUse = pIpSet->inUse; pContext->connType = RPC_CONN_UDPC; - if (pMsg->contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; + if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection @@ -374,11 +374,14 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { return; } -void rpcSendResponse(SRpcMsg *pMsg) { +void rpcSendResponse(const SRpcMsg *pRsp) { int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)pMsg->handle; + SRpcConn *pConn = (SRpcConn *)pRsp->handle; SRpcInfo *pRpc = pConn->pRpc; + SRpcMsg rpcMsg = *pRsp; + SRpcMsg *pMsg = &rpcMsg; + if ( pMsg->pCont == NULL ) { pMsg->pCont = rpcMallocCont(0); pMsg->contLen = 0; @@ -429,7 +432,7 @@ void rpcSendResponse(SRpcMsg *pMsg) { return; } -void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { +void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) { SRpcMsg rpcMsg; rpcMsg.contLen = sizeof(SRpcIpSet); @@ -458,7 +461,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } -void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 17389670982a0625b5b4a1d53766454f51bfe574..e799bfd63a1c049bf7a7b5de06ea767266460708 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -48,11 +48,11 @@ typedef struct { int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; -static int walHandleExistingFiles(char *path); -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)); -static int walRemoveWalFiles(char *path); +static int walHandleExistingFiles(const char *path); +static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp); +static int walRemoveWalFiles(const char *path); -void *walOpen(char *path, SWalCfg *pCfg) { +void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); if (pWal == NULL) return NULL; @@ -82,7 +82,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { void walClose(void *handle) { if (handle == NULL) return; - SWal *pWal = (SWal *)handle; + SWal *pWal = handle; close(pWal->fd); @@ -101,8 +101,8 @@ void walClose(void *handle) { free(pWal); } -int walRenew(twal_h handle) { - SWal *pWal = (SWal *)handle; +int walRenew(void *handle) { + SWal *pWal = handle; int code = 0; pthread_mutex_lock(&pWal->mutex); @@ -144,7 +144,7 @@ int walRenew(twal_h handle) { } int walWrite(void *handle, SWalHead *pHead) { - SWal *pWal = (SWal *)handle; + SWal *pWal = handle; int code = 0; // no wal @@ -164,14 +164,14 @@ int walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle) { - SWal *pWal = (SWal *)handle; + SWal *pWal = handle; if (pWal->level == TAOS_WAL_FSYNC) fsync(pWal->fd); } int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { - SWal *pWal = (SWal *)handle; + SWal *pWal = handle; int code = 0; struct dirent *ent; int count = 0; @@ -223,7 +223,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } int walGetWalFile(void *handle, char *name, uint32_t *index) { - SWal *pWal = (SWal *)handle; + SWal *pWal = handle; int code = 1; int32_t first = 0; @@ -247,7 +247,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) { +static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) { int code = 0; char *buffer = malloc(1024000); // size for one record @@ -293,7 +293,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo return code; } -int walHandleExistingFiles(char *path) { +int walHandleExistingFiles(const char *path) { int code = 0; char oname[TSDB_FILENAME_LEN * 3]; char nname[TSDB_FILENAME_LEN * 3]; @@ -335,7 +335,7 @@ int walHandleExistingFiles(char *path) { return code; } -static int walRemoveWalFiles(char *path) { +static int walRemoveWalFiles(const char *path) { int plen = strlen(walPrefix); char name[TSDB_FILENAME_LEN * 3]; int code = 0;