diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 0861c94a76e90dc4e74f0f75e31b3d1fc524664b..d57433eba9bacb622081032f4ed6a2a72406248d 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -51,9 +51,9 @@ typedef struct { } SSyncCfg; typedef struct { - int selfIndex; - uint32_t nodeId[TAOS_SYNC_MAX_REPLICA]; - int role[TAOS_SYNC_MAX_REPLICA]; + int32_t selfIndex; + uint32_t nodeId[TAOS_SYNC_MAX_REPLICA]; + int32_t role[TAOS_SYNC_MAX_REPLICA]; } SNodesRole; /* @@ -83,25 +83,24 @@ typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); // when data file is synced successfully, notity app -typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { - int32_t vgId; // vgroup ID - uint64_t version; // initial version - SSyncCfg syncCfg; // configuration from mgmt - char path[128]; // path to the file - - void *ahandle; // handle provided by APP - FGetFileInfo getFileInfo; - FGetWalInfo getWalInfo; - FWriteToCache writeToCache; - FConfirmForward confirmForward; - FNotifyRole notifyRole; - FNotifyFlowCtrl notifyFlowCtrl; + int32_t vgId; // vgroup ID + uint64_t version; // initial version + SSyncCfg syncCfg; // configuration from mgmt + char path[128]; // path to the file + void * ahandle; // handle provided by APP + FGetFileInfo getFileInfo; + FGetWalInfo getWalInfo; + FWriteToCache writeToCache; + FConfirmForward confirmForward; + FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; } SSyncInfo; -typedef void* tsync_h; +typedef void *tsync_h; int32_t syncInit(); void syncCleanUp(); @@ -109,22 +108,22 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo *); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg *); -int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); -void syncRecover(int64_t rid); // recover from other nodes: -int syncGetNodesRole(int64_t rid, SNodesRole *); +void syncRecover(int64_t rid); // recover from other nodes: +int32_t syncGetNodesRole(int64_t rid, SNodesRole *); -extern char *syncRole[]; +extern char *syncRole[]; //global configurable parameters -extern int tsMaxSyncNum; -extern int tsSyncTcpThreads; -extern int tsMaxWatchFiles; -extern int tsSyncTimer; -extern int tsMaxFwdInfo; -extern int sDebugFlag; -extern char tsArbitrator[]; -extern uint16_t tsSyncPort; +extern int32_t tsMaxSyncNum; +extern int32_t tsSyncTcpThreads; +extern int32_t tsMaxWatchFiles; +extern int32_t tsSyncTimer; +extern int32_t tsMaxFwdInfo; +extern int32_t sDebugFlag; +extern char tsArbitrator[]; +extern uint16_t tsSyncPort; #ifdef __cplusplus } diff --git a/src/plugins/http/inc/httpQueue.h b/src/plugins/http/inc/httpQueue.h index a4590719ff24d48eee875b2f2c4ff2f28a0a31f6..1ffbd5148164ab4d1f74bdac781c8fcf731ad772 100644 --- a/src/plugins/http/inc/httpQueue.h +++ b/src/plugins/http/inc/httpQueue.h @@ -22,9 +22,11 @@ extern "C" { #include +typedef void (*FHttpResultFp)(void *param, void *result, int32_t code, int32_t rows); + bool httpInitResultQueue(); void httpCleanupResultQueue(); -void httpDispatchToResultQueue(); +void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp); #ifdef __cplusplus } diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 0fb055972a5d62de5cb93e81485c1fe3cc5b8364..1c039abb4d4cd546f6673648217a2410f57ae8e7 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -25,6 +25,7 @@ #include "httpResp.h" #include "httpAuth.h" #include "httpSession.h" +#include "httpQueue.h" typedef struct { pthread_t thread; @@ -37,42 +38,45 @@ typedef struct { } SHttpWorkerPool; typedef struct { - void *param; - void *result; - int32_t numOfRows; - void (*fp)(void *param, void *result, int32_t numOfRows); + void * param; + void * result; + int32_t code; + int32_t rows; + FHttpResultFp fp; } SHttpResult; static SHttpWorkerPool tsHttpPool; static taos_qset tsHttpQset; static taos_queue tsHttpQueue; -void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t numOfRows, void (*fp)(void *param, void *result, int32_t numOfRows)) { +void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) { if (tsHttpQueue != NULL) { SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult)); pMsg->param = param; pMsg->result = result; - pMsg->numOfRows = numOfRows; + pMsg->code = code; + pMsg->rows = rows; pMsg->fp = fp; taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg); } else { - (*fp)(param, result, numOfRows); + (*fp)(param, result, code, rows); } } static void *httpProcessResultQueue(void *param) { SHttpResult *pMsg; - int32_t type; - void *unUsed; - + int32_t type; + void * unUsed; + while (1) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); break; } - httpTrace("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result); - (*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows); + httpTrace("context:%p, res:%p will be processed in result queue, code:%d rows:%d", pMsg->param, pMsg->result, + pMsg->code, pMsg->rows); + (*pMsg->fp)(pMsg->param, pMsg->result, pMsg->code, pMsg->rows); taosFreeQitem(pMsg); } diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 70d644146cf5ebe15a42c8f00a2e6e4bd603d081..564f555c403b6f5b3deb5832da6505722104eff8 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -29,9 +29,9 @@ void httpProcessMultiSql(HttpContext *pContext); -void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); +void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows); -void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { +void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t numOfRows) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -43,7 +43,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n bool isContinue = false; - if (numOfRows > 0) { + if (code == TSDB_CODE_SUCCESS && numOfRows > 0) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->buildQueryJsonFp) { isContinue = (encode->buildQueryJsonFp)(pContext, singleCmd, result, numOfRows); } @@ -58,9 +58,9 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd, pContext->user, multiCmds->pos, numOfRows, sql); - if (numOfRows < 0) { + if (code < 0) { httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd, - pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); + pContext->user, multiCmds->pos, tstrerror(code), sql); } taos_free_result(result); @@ -73,15 +73,15 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n } } -void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { - httpDispatchToResultQueue(param, result, numOfRows, httpProcessMultiSqlRetrieveCallBackImp); +void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows) { + int32_t code = taos_errno(result); + httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessMultiSqlRetrieveCallBackImp); } -void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { +void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t affectRowsInput) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; - code = taos_errno(result); HttpSqlCmds *multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -94,7 +94,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { return; } - if (code < 0) { + if (code != TSDB_CODE_SUCCESS) { if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) { singleCmd->code = code; httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd, @@ -119,7 +119,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { bool isUpdate = tscIsUpdateQuery(result); if (isUpdate) { // not select or show commands - int affectRows = taos_affected_rows(result); + int32_t affectRows = taos_affected_rows(result); httpDebug("context:%p, fd:%d, user:%s, process pos:%d, affect rows:%d, sql:%s", pContext, pContext->fd, pContext->user, multiCmds->pos, affectRows, sql); @@ -156,8 +156,10 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { } } -void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { - httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp); +void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int32_t unUsedCode) { + int32_t code = taos_errno(result); + int32_t affectRows = taos_affected_rows(result); + httpDispatchToResultQueue(param, result, code, affectRows, httpProcessMultiSqlCallBackImp); } void httpProcessMultiSql(HttpContext *pContext) { @@ -202,9 +204,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { httpProcessMultiSql(pContext); } -void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows); -void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { +void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t numOfRows) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; @@ -212,7 +214,7 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int bool isContinue = false; - if (numOfRows > 0) { + if (code == TSDB_CODE_SUCCESS && numOfRows > 0) { if (encode->buildQueryJsonFp) { isContinue = (encode->buildQueryJsonFp)(pContext, &pContext->singleCmd, result, numOfRows); } @@ -227,9 +229,9 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user, numOfRows); - if (numOfRows < 0) { + if (code < 0) { httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user, - tstrerror(numOfRows)); + tstrerror(code)); } taos_free_result(result); @@ -242,30 +244,30 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int } } -void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { - httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); +void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows) { + int32_t code = taos_errno(result); + httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); } -void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) { +void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t affectRowsInput) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; - int32_t code = taos_errno(result); - HttpEncodeMethod *encode = pContext->encodeMethod; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, - pContext->user, tstrerror(code), (SSqlObj *)result); + pContext->user, tstrerror(code), result); return; } - if (code < 0) { + if (code != TSDB_CODE_SUCCESS) { SSqlObj *pObj = (SSqlObj *)result; if (code == TSDB_CODE_TSC_INVALID_SQL) { - httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, - pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload); - httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); + terrno = code; + httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd, + pContext->user, tstrerror(code), pObj, taos_errstr(pObj)); + httpSendTaosdInvalidSqlErrorResp(pContext, taos_errstr(pObj)); } else { httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, pContext->user, tstrerror(code), pObj); @@ -278,7 +280,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo bool isUpdate = tscIsUpdateQuery(result); if (isUpdate) { // not select or show commands - int affectRows = taos_affected_rows(result); + int32_t affectRows = taos_affected_rows(result); + assert(affectRows == affectRowsInput); httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd, pContext->user, affectRows, result); @@ -308,8 +311,10 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo } } -void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { - httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp); +void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int32_t unUsedCode) { + int32_t code = taos_errno(result); + int32_t affectRows = taos_affected_rows(result); + httpDispatchToResultQueue(param, result, code, affectRows, httpProcessSingleSqlCallBackImp); } void httpProcessSingleSqlCmd(HttpContext *pContext) { @@ -373,7 +378,7 @@ void httpExecCmd(HttpContext *pContext) { } } -void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { +void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { HttpContext *pContext = param; taos_free_result(result); diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 8808a82c463cd9023395881871b99d531a93fc97..240b401bdaef252e36109490658b39ede749a8fe 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -89,11 +89,11 @@ typedef struct { #pragma pack(pop) typedef struct { - char *buffer; - int bufferSize; - char *offset; - int forwards; - int code; + char * buffer; + int32_t bufferSize; + char * offset; + int32_t forwards; + int32_t code; } SRecvBuffer; typedef struct { @@ -107,10 +107,10 @@ typedef struct { } SFwdInfo; typedef struct { - int first; - int last; - int fwds; // number of forwards - SFwdInfo fwdInfo[]; + int32_t first; + int32_t last; + int32_t fwds; // number of forwards + SFwdInfo fwdInfo[]; } SSyncFwds; typedef struct SsyncPeer { @@ -123,15 +123,15 @@ typedef struct SsyncPeer { int8_t sstatus; // sync status uint64_t version; uint64_t sversion; // track the peer version in retrieve process - int syncFd; - int peerFd; // forward FD - int numOfRetrieves; // number of retrieves tried - int fileChanged; // a flag to indicate file is changed during retrieving process + int32_t syncFd; + int32_t peerFd; // forward FD + int32_t numOfRetrieves; // number of retrieves tried + int32_t fileChanged; // a flag to indicate file is changed during retrieving process void * timer; void * pConn; - int notifyFd; - int watchNum; - int * watchFd; + int32_t notifyFd; + int32_t watchNum; + int32_t *watchFd; int8_t refCount; // reference count struct SSyncNode *pSyncNode; } SSyncPeer; @@ -161,16 +161,16 @@ typedef struct SSyncNode { } SSyncNode; // sync module global -extern int tsSyncNum; -extern char tsNodeFqdn[TSDB_FQDN_LEN]; +extern int32_t tsSyncNum; +extern char tsNodeFqdn[TSDB_FQDN_LEN]; void *syncRetrieveData(void *param); void *syncRestoreData(void *param); -int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); -void syncRestartConnection(SSyncPeer *pPeer); -void syncBroadcastStatus(SSyncNode *pNode); -void syncAddPeerRef(SSyncPeer *pPeer); -int syncDecPeerRef(SSyncPeer *pPeer); +int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); +void syncRestartConnection(SSyncPeer *pPeer); +void syncBroadcastStatus(SSyncNode *pNode); +void syncAddPeerRef(SSyncPeer *pPeer); +int32_t syncDecPeerRef(SSyncPeer *pPeer); #ifdef __cplusplus } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 1d0dab7f46b5d545a884fdb35d11ab70e6314072..9dcd0fd632396baf5f8bf68c3df24dae72d45a02 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -31,38 +31,38 @@ #include "syncInt.h" // global configurable -int tsMaxSyncNum = 2; -int tsSyncTcpThreads = 2; -int tsMaxWatchFiles = 500; -int tsMaxFwdInfo = 200; -int tsSyncTimer = 1; +int32_t tsMaxSyncNum = 2; +int32_t tsSyncTcpThreads = 2; +int32_t tsMaxWatchFiles = 500; +int32_t tsMaxFwdInfo = 200; +int32_t tsSyncTimer = 1; // module global, not configurable -int tsSyncNum; // number of sync in process in whole system -char tsNodeFqdn[TSDB_FQDN_LEN]; +int32_t tsSyncNum; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN]; static ttpool_h tsTcpPool; -static void * syncTmrCtrl = NULL; -static void * vgIdHash; -static int tsSyncRefId = -1; +static void * tsSyncTmrCtrl = NULL; +static void * tsVgIdHash; +static int32_t tsSyncRefId = -1; // local functions -static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); -static void syncRecoverFromMaster(SSyncPeer *pPeer); -static void syncCheckPeerConnection(void *param, void *tmrId); -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); -static void syncProcessBrokenLink(void *param); -static int syncProcessPeerMsg(void *param, void *buffer); -static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); -static void syncRemovePeer(SSyncPeer *pPeer); -static void syncAddArbitrator(SSyncNode *pNode); -static void syncFreeNode(void *); -static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); -static void syncMonitorFwdInfos(void *param, void *tmrId); -static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); -static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); -static void syncRestartPeer(SSyncPeer *pPeer); -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); +static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); +static void syncRecoverFromMaster(SSyncPeer *pPeer); +static void syncCheckPeerConnection(void *param, void *tmrId); +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); +static void syncProcessBrokenLink(void *param); +static int32_t syncProcessPeerMsg(void *param, void *buffer); +static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void syncRemovePeer(SSyncPeer *pPeer); +static void syncAddArbitrator(SSyncNode *pNode); +static void syncFreeNode(void *); +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); +static void syncMonitorFwdInfos(void *param, void *tmrId); +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); +static void syncRestartPeer(SSyncPeer *pPeer); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); char* syncRole[] = { @@ -90,21 +90,21 @@ int32_t syncInit() { return -1; } - syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); - if (syncTmrCtrl == NULL) { + tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); + if (tsSyncTmrCtrl == NULL) { sError("failed to init tmrCtrl"); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; return -1; } - vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); - if (vgIdHash == NULL) { - sError("failed to init vgIdHash"); - taosTmrCleanUp(syncTmrCtrl); + tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + if (tsVgIdHash == NULL) { + sError("failed to init tsVgIdHash"); + taosTmrCleanUp(tsSyncTmrCtrl); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; - syncTmrCtrl = NULL; + tsSyncTmrCtrl = NULL; return -1; } @@ -126,14 +126,14 @@ void syncCleanUp() { tsTcpPool = NULL; } - if (syncTmrCtrl) { - taosTmrCleanUp(syncTmrCtrl); - syncTmrCtrl = NULL; + if (tsSyncTmrCtrl) { + taosTmrCleanUp(tsSyncTmrCtrl); + tsSyncTmrCtrl = NULL; } - if (vgIdHash) { - taosHashCleanup(vgIdHash); - vgIdHash = NULL; + if (tsVgIdHash) { + taosHashCleanup(tsVgIdHash); + tsVgIdHash = NULL; } taosCloseRef(tsSyncRefId); @@ -176,7 +176,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { return -1; } - for (int i = 0; i < pCfg->replica; ++i) { + for (int32_t i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { @@ -204,7 +204,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { return -1; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); if (pNode->pFwdTimer == NULL) { sError("vgId:%d, failed to allocate timer", pNode->vgId); syncStop(pNode->rid); @@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { } syncAddArbitrator(pNode); - taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); + taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); if (pNode->notifyRole) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); @@ -231,10 +231,10 @@ void syncStop(int64_t rid) { pthread_mutex_lock(&(pNode->mutex)); - if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); + if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; if (pPeer) syncRemovePeer(pPeer); } @@ -249,7 +249,7 @@ void syncStop(int64_t rid) { } int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { - int i, j; + int32_t i, j; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; @@ -321,7 +321,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) { +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) { SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); if (pNode == NULL) return 0; @@ -348,8 +348,8 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { pFwdRsp->version = version; pFwdRsp->code = code; - int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int retLen = write(pPeer->peerFd, msg, msgLen); + int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int32_t retLen = write(pPeer->peerFd, msg, msgLen); if (retLen == msgLen) { sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); @@ -377,7 +377,7 @@ void syncRecover(int64_t rid) { pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = (SSyncPeer *)pNode->peerInfo[i]; if (pPeer->peerFd >= 0) { syncRestartConnection(pPeer); @@ -389,12 +389,12 @@ void syncRecover(int64_t rid) { taosReleaseRef(tsSyncRefId, rid); } -int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { +int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); if (pNode == NULL) return -1; pNodesRole->selfIndex = pNode->selfIndex; - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } @@ -416,7 +416,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { SNodeInfo nodeInfo; nodeInfo.nodeId = 0; - int ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); + int32_t ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); if (-1 == ret) { nodeInfo.nodePort = tsArbitratorPort; } @@ -444,7 +444,7 @@ static void syncFreeNode(void *param) { void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } -int syncDecPeerRef(SSyncPeer *pPeer) { +int32_t syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); @@ -495,12 +495,12 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->refCount = 1; sInfo("%s, it is configured", pPeer->id); - int ret = strcmp(pPeer->fqdn, tsNodeFqdn); + int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); - taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); } taosAcquireRef(tsSyncRefId, pNode->rid); @@ -510,7 +510,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { void syncBroadcastStatus(SSyncNode *pNode) { SSyncPeer *pPeer; - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { if (i == pNode->selfIndex) continue; pPeer = pNode->peerInfo[i]; syncSendPeersStatusMsgToPeer(pPeer, 1); @@ -518,7 +518,7 @@ void syncBroadcastStatus(SSyncNode *pNode) { } static void syncResetFlowCtrl(SSyncNode *pNode) { - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pNode->peerInfo[i]->numOfRetrieves = 0; } @@ -529,13 +529,13 @@ static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncChooseMaster(SSyncNode *pNode) { SSyncPeer *pPeer; - int onlineNum = 0; - int index = -1; - int replica = pNode->replica; + int32_t onlineNum = 0; + int32_t index = -1; + int32_t replica = pNode->replica; sDebug("vgId:%d, choose master", pNode->vgId); - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; } @@ -544,7 +544,7 @@ static void syncChooseMaster(SSyncNode *pNode) { if (onlineNum == pNode->replica) { // if all peers are online, peer with highest version shall be master index = 0; - for (int i = 1; i < pNode->replica; ++i) { + for (int32_t i = 1; i < pNode->replica; ++i) { if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) { index = i; } @@ -560,7 +560,7 @@ static void syncChooseMaster(SSyncNode *pNode) { if (index < 0 && onlineNum > replica / 2.0) { // over half of nodes are online - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { // slave with highest version shall be master pPeer = pNode->peerInfo[i]; if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { @@ -587,11 +587,11 @@ static void syncChooseMaster(SSyncNode *pNode) { } static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { - int onlineNum = 0; - int index = -1; - int replica = pNode->replica; + int32_t onlineNum = 0; + int32_t index = -1; + int32_t replica = pNode->replica; - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; } @@ -612,7 +612,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; if (index < 0) { @@ -631,9 +631,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { return pMaster; } -static int syncValidateMaster(SSyncPeer *pPeer) { +static int32_t syncValidateMaster(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - int code = 0; + int32_t code = 0; if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); @@ -641,7 +641,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); code = -1; - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { if (i == pNode->selfIndex) continue; syncRestartPeer(pNode->peerInfo[i]); } @@ -683,7 +683,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne } } else { // master not there, if all peer's state and version are consistent, choose the master - int consistent = 0; + int32_t consistent = 0; if (peersStatus) { for (i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; @@ -721,9 +721,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; - int ret = strcmp(pPeer->fqdn, tsNodeFqdn); + int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -757,7 +757,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); + int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); pthread_attr_destroy(&thattr); if (ret != 0) { @@ -802,7 +802,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { // Ensure the sync of mnode not interrupted if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); - taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; } @@ -815,7 +815,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { sError("%s, failed to send sync-req to peer", pPeer->id); @@ -836,7 +836,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { // find the forwardInfo from first - for (int i = 0; i < pSyncFwds->fwds; ++i) { + for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; if (pFwdRsp->version == pFwdInfo->version) break; } @@ -879,10 +879,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { } } -static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { +static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { if (pPeer->peerFd < 0) return -1; - int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); + int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); if (hlen != sizeof(SSyncHead)) { sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); return -1; @@ -894,7 +894,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { return -1; } - int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); + int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); if (bytes != pHead->len) { sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); return -1; @@ -903,7 +903,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { return 0; } -static int syncProcessPeerMsg(void *param, void *buffer) { +static int32_t syncProcessPeerMsg(void *param, void *buffer) { SSyncPeer *pPeer = param; SSyncHead head; char * cont = buffer; @@ -911,7 +911,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) { SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); - int code = syncReadPeerMsg(pPeer, &head, cont); + int32_t code = syncReadPeerMsg(pPeer, &head, cont); if (code == 0) { if (head.type == TAOS_SMSG_FORWARD) { @@ -948,12 +948,12 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { pPeersStatus->role = nodeRole; pPeersStatus->ack = ack; - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; } - int retLen = write(pPeer->peerFd, msg, statusMsgLen); + int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen); if (retLen == statusMsgLen) { sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); @@ -975,10 +975,10 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { return; } - int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; } @@ -999,7 +999,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { } else { sDebug("try later"); close(connFd); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -1024,7 +1024,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); syncAddPeerRef(pPeer); - int ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer); + int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer); pthread_attr_destroy(&thattr); if (ret < 0) { @@ -1036,9 +1036,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { } } -static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { - char ipstr[24]; - int i; +static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { + char ipstr[24]; + int32_t i; tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -1051,7 +1051,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { } int32_t vgId = firstPkt.syncHead.vgId; - SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t)); + SSyncNode **ppNode = (SSyncNode **)taosHashGet(tsVgIdHash, (const char *)&vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { sError("vgId:%d, vgId could not be found", vgId); taosCloseSocket(connFd); @@ -1137,8 +1137,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; - int fwds = pSyncFwds->fwds; - for (int i = 0; i < fwds; ++i) { + int32_t fwds = pSyncFwds->fwds; + for (int32_t i = 0; i < fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFwdInfo->confirmed == 0) break; @@ -1152,7 +1152,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { } static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) { - int confirm = 0; + int32_t confirm = 0; if (pFwdInfo->code == 0) pFwdInfo->code = code; if (code == 0) { @@ -1186,7 +1186,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pSyncFwds->fwds; ++i) { + for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; if (time - pFwdInfo->time < 2000) break; syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); @@ -1196,23 +1196,23 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); } taosReleaseRef(tsSyncRefId, rid); } -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) { SSyncPeer *pPeer; SSyncHead *pSyncHead; SWalHead * pWalHead = data; - int fwdLen; + int32_t fwdLen; int32_t code = 0; if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, pWalHead->version, nodeVersion); - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; syncRestartConnection(pPeer); } @@ -1238,7 +1238,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pNode->replica; ++i) { + for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; if (pPeer == NULL || pPeer->peerFd < 0) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; @@ -1248,7 +1248,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle code = 1; } - int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); if (retLen == fwdLen) { sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); } else { diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 004dae1729177891babe9d97348b558463e30cc1..44aed220d7369f63c4e7cdb5b3ce86d75c91b73e 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -48,12 +48,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex } } -static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { +static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info SFileAck fileAck; - int code = -1; + int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; uint32_t pindex = 0; // index in last restore bool fileChanged = false; @@ -62,7 +62,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { sinfo.index = 0; while (1) { // read file info - int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); + int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); if (ret < 0) break; // if no more file from master, break; @@ -104,7 +104,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { minfo.name[sizeof(minfo.name) - 1] = 0; snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); - int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); if (dfd < 0) { sError("%s, failed to open file:%s", pPeer->id, name); break; @@ -132,9 +132,9 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { return code; } -static int syncRestoreWal(SSyncPeer *pPeer) { +static int32_t syncRestoreWal(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - int ret, code = -1; + int32_t ret, code = -1; void *buffer = calloc(1024000, 1); // size for one record if (buffer == NULL) return -1; @@ -175,10 +175,10 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) { return offset; } -static int syncProcessBufferedFwd(SSyncPeer *pPeer) { +static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; - int forwards = 0; + int32_t forwards = 0; sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards); @@ -203,12 +203,12 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) { return pRecv->code; } -int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { +int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { SSyncNode * pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; if (pRecv == NULL) return -1; - int len = pHead->len + sizeof(SWalHead); + int32_t len = pHead->len + sizeof(SWalHead); if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) { memcpy(pRecv->offset, pHead, len); @@ -231,7 +231,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) { tfree(pNode->pRecv); } -static int syncOpenRecvBuffer(SSyncNode *pNode) { +static int32_t syncOpenRecvBuffer(SSyncNode *pNode) { syncCloseRecvBuffer(pNode); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); @@ -252,13 +252,13 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) { return 0; } -static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { +static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; sDebug("%s, start to restore file", pPeer->id); - int code = syncRestoreFile(pPeer, &fversion); + int32_t code = syncRestoreFile(pPeer, &fversion); if (code < 0) { sError("%s, failed to restore file", pPeer->id); return -1; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index d77587f2274918c230f5993f7fa7736da07469f9..82a4627ea55a9f0caf041b9ee41b4b3e50c229e4 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -27,7 +27,7 @@ #include "tsync.h" #include "syncInt.h" -static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { +static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { sDebug("%s, start to monitor:%s", pPeer->id, name); if (pPeer->notifyFd <= 0) { @@ -38,16 +38,16 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { return -1; } - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) { sError("%s, failed to allocate watchFd", pPeer->id); return -1; } - memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); + memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles); } - int *wd = pPeer->watchFd + pPeer->watchNum; + int32_t *wd = pPeer->watchFd + pPeer->watchNum; if (*wd >= 0) { if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { @@ -69,17 +69,17 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { return 0; } -static int syncAreFilesModified(SSyncPeer *pPeer) { +static int32_t syncAreFilesModified(SSyncPeer *pPeer) { if (pPeer->notifyFd <= 0) return 0; - char buf[2048]; - int len = read(pPeer->notifyFd, buf, sizeof(buf)); + char buf[2048]; + int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; } - int code = 0; + int32_t code = 0; if (len > 0) { const struct inotify_event *event; char *ptr; @@ -97,11 +97,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) { return code; } -static int syncRetrieveFile(SSyncPeer *pPeer) { +static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo fileInfo; SFileAck fileAck; - int code = -1; + int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; memset(&fileInfo, 0, sizeof(fileInfo)); @@ -146,7 +146,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { } // send the file to peer - int sfd = open(name, O_RDONLY); + int32_t sfd = open(name, O_RDONLY); if (sfd < 0) break; ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); @@ -169,8 +169,8 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { /* if only a partial record is read out, set the IN_MODIFY flag in event, so upper layer will reload the file to get a complete record */ -static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { - int ret; +static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) { + int32_t ret; ret = read(sfd, pHead, sizeof(SWalHead)); if (ret < 0) return -1; @@ -194,7 +194,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { return sizeof(SWalHead) + pHead->len; } -static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { +static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { pPeer->watchNum = 0; taosClose(pPeer->notifyFd); pPeer->notifyFd = inotify_init1(IN_NONBLOCK); @@ -203,14 +203,14 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { return -1; } - if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); + if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) { sError("%s, failed to allocate watchFd", pPeer->id); return -1; } - memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); - int *wd = pPeer->watchFd; + memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles); + int32_t *wd = pPeer->watchFd; *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); if (*wd == -1) { @@ -222,8 +222,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { } static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { - char buf[2048]; - int len = read(pPeer->notifyFd, buf, sizeof(buf)); + char buf[2048]; + int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; @@ -243,11 +243,11 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { return 0; } -static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { +static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { SWalHead *pHead = malloc(640000); - int code = -1; + int32_t code = -1; int32_t bytes = 0; - int sfd; + int32_t sfd; sfd = open(name, O_RDONLY); if (sfd < 0) { @@ -259,7 +259,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion); while (1) { - int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); + int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent); if (wsize < 0) break; if (wsize == 0) { code = 0; @@ -267,7 +267,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, } sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); - int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); + int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); if (ret != wsize) break; pPeer->sversion = pHead->version; @@ -287,9 +287,9 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { +static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; - int code = -1; + int32_t code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file if (syncAreFilesModified(pPeer) != 0) return -1; @@ -370,13 +370,13 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { return code; } -static int syncRetrieveWal(SSyncPeer *pPeer) { +static int32_t syncRetrieveWal(SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; struct stat fstat; - int code = -1; + int32_t code = -1; int64_t index = 0; while (1) { @@ -403,7 +403,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); - int sfd = open(fname, O_RDONLY); + int32_t sfd = open(fname, O_RDONLY); if (sfd < 0) break; code = taosSendFile(pPeer->syncFd, sfd, NULL, size); @@ -428,7 +428,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { return code; } -static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { +static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFirstPkt firstPkt; diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 34e936c9f3b6e3c350f72ed1f1c33bd94a416ef1..496bf074357487ab1c87072948b2e26be056dc58 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -28,22 +28,22 @@ #include "syncInt.h" static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); -static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); +static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessBrokenLink(void *param); -static int arbProcessPeerMsg(void *param, void *buffer); +static int32_t arbProcessPeerMsg(void *param, void *buffer); static tsem_t tsArbSem; static ttpool_h tsArbTcpPool; typedef struct { - char id[TSDB_EP_LEN + 24]; - int nodeFd; - void *pConn; + char id[TSDB_EP_LEN + 24]; + int32_t nodeFd; + void * pConn; } SNodeConn; -int main(int argc, char *argv[]) { +int32_t main(int32_t argc, char *argv[]) { char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; - for (int i = 1; i < argc; ++i) { + for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { tsArbitratorPort = atoi(argv[++i]); } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { @@ -108,7 +108,7 @@ int main(int argc, char *argv[]) { return 0; } -static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) { +static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { char ipstr[24]; tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -150,13 +150,13 @@ static void arbProcessBrokenLink(void *param) { tfree(pNode); } -static int arbProcessPeerMsg(void *param, void *buffer) { +static int32_t arbProcessPeerMsg(void *param, void *buffer) { SNodeConn *pNode = param; SSyncHead head; - int bytes = 0; + int32_t bytes = 0; char * cont = (char *)buffer; - int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); + int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); if (hlen != sizeof(head)) { sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); return -1;