diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b48a8775ce7924976ab3863d82c1907a404b8354..68d4216bae23c5cbf766ee887d6ade0c2e24e1a1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg; extern const SVnodeCfg vnodeCfgDefault; -int vnodeInit(int nthreads); +int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); -int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); +int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); -int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); +int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); +int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -int vnodeValidateTableHash(SVnode *pVnode, char *tableFName); +int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); @@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry; void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderClear(SMetaReader *pReader); -int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -int metaReadNext(SMetaReader *pReader); +int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReadNext(SMetaReader *pReader); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); #if 1 // refact APIs below (TODO) @@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -int metaTbCursorNext(SMTbCursor *pTbCur); +int32_t metaTbCursorNext(SMTbCursor *pTbCur); #endif // tsdb @@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle; STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); -int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); -int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); @@ -207,15 +207,15 @@ struct SMetaReader { SDecoder coder; SMetaEntry me; void *pBuf; - int szBuf; + int32_t szBuf; }; struct SMTbCursor { TBC *pDbc; void *pKey; void *pVal; - int kLen; - int vLen; + int32_t kLen; + int32_t vLen; SMetaReader mr; }; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a034833a57a172d2aecc10d69a6e2e138a5422f0..eb3382ac4cd46a602a214b09b5a8debeaf15087f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -24,7 +24,6 @@ extern "C" { #endif -// vnodeDebug ==================== // clang-format off #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -34,17 +33,17 @@ extern "C" { #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -// vnodeCfg ==================== +// vnodeCfg.c extern const SVnodeCfg vnodeCfgDefault; -int vnodeCheckCfg(const SVnodeCfg*); -int vnodeEncodeConfig(const void* pObj, SJson* pJson); -int vnodeDecodeConfig(const SJson* pJson, void* pObj); +int32_t vnodeCheckCfg(const SVnodeCfg*); +int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson); +int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); -// vnodeModule ==================== -int vnodeScheduleTask(int (*execute)(void*), void* arg); +// vnodeModule.c +int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg); -// vnodeBufPool ==================== +// vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; struct SVBufPoolNode { SVBufPoolNode* prev; @@ -62,37 +61,29 @@ struct SVBufPool { SVBufPoolNode node; }; -int vnodeOpenBufPool(SVnode* pVnode, int64_t size); -int vnodeCloseBufPool(SVnode* pVnode); -void vnodeBufPoolReset(SVBufPool* pPool); +int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); +int32_t vnodeCloseBufPool(SVnode* pVnode); +void vnodeBufPoolReset(SVBufPool* pPool); -// vnodeQuery ==================== -int vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryClose(SVnode* pVnode); -int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); +// vnodeQuery.c +int32_t vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); +int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -// vnodeCommit ==================== -int vnodeBegin(SVnode* pVnode); -int vnodeShouldCommit(SVnode* pVnode); -int vnodeCommit(SVnode* pVnode); -int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); -int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); -int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); -int vnodeSyncCommit(SVnode* pVnode); -int vnodeAsyncCommit(SVnode* pVnode); +// vnodeCommit.c +int32_t vnodeBegin(SVnode* pVnode); +int32_t vnodeShouldCommit(SVnode* pVnode); +int32_t vnodeCommit(SVnode* pVnode); +int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); +int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); +int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); +int32_t vnodeSyncCommit(SVnode* pVnode); +int32_t vnodeAsyncCommit(SVnode* pVnode); -// vnodeCommit ==================== +// vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); -int32_t vnodeSyncStart(SVnode* pVnode); +void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -void vnodeSyncSetMsgCb(SVnode* pVnode); -int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg); -int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg); -void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); -SSyncFSM* syncVnodeMakeFsm(); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 56389de88a4137fe460e5e4c4e484d3f6ce7c304..d9ff60bbe22b573db34331e5aabbd04b06ff5616 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); + pMsg->info.noResp = 1; pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); @@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); + pMsg->info.noResp = 1; pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index b8a9bec1087b41e60f94b11ea2ec1b0a651a98b9..630d96054bef043134b76233683551abe682bd6c 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -97,11 +97,12 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest"); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); + rpcMsg.info.noResp = 1; pSyncNode->FpSendMsg(&epSet, &rpcMsg); taosMsleep(1000);