提交 acc5c53b 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

make changes on app

上级 5b735ad0
......@@ -330,6 +330,7 @@ typedef struct STscObj {
char writeAuth : 1;
char superAuth : 1;
uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
......@@ -348,7 +349,7 @@ typedef struct SSqlObj {
void *signature;
pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj;
void *pRpcCtx;
int64_t rpcRid;
void (*fp)();
void (*fetchFp)();
void *param;
......
......@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
} else {
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
}
}
void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle;
int ret = taosAcquireRef(tscRefId, pObj);
if (ret < 0) {
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
return;
}
int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid);
if (pObj == NULL) return;
SSqlObj* pHB = pObj->pHb;
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj);
taosReleaseRef(tscRefId, pObj->rid);
return;
}
......@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
}
taosReleaseRef(tscRefId, pObj);
taosReleaseRef(tscRefId, rid);
}
int tscSendMsgToServer(SSqlObj *pSql) {
......@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.ahandle = pSql,
.handle = &pSql->pRpcCtx,
.handle = NULL,
.code = 0
};
......@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql);
pSql->pRpcCtx = NULL;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
......@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
createHBObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
return 0;
}
......
......@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
taosAddRef(tscRefId, pObj);
pObj->rid = taosAddRef(tscRefId, pObj);
return pSql;
}
......@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) {
SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx);
pHb->pRpcCtx = NULL;
if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1;
}
tscDebug("%p HB is freed", pHb);
......@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj);
taosRemoveRef(tscRefId, pObj->rid);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
......@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) {
rpcCancelRequest(pSubObj->pRpcCtx);
pSubObj->pRpcCtx = NULL;
if (pSubObj->rpcRid > 0) {
rpcCancelRequest(pSubObj->rpcRid);
pSubObj->rpcRid = -1;
}
tscQueueAsyncRes(pSubObj);
......@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL);
assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql);
} else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
......@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) {
* reset and freed in the processMsgFromServer function, and causes the invalid
* write problem for rpcCancelRequest.
*/
if (pSql->pRpcCtx != NULL) {
rpcCancelRequest(pSql->pRpcCtx);
pSql->pRpcCtx = NULL;
if (pSql->rpcRid > 0) {
rpcCancelRequest(pSql->rpcRid);
pSql->rpcRid = -1;
}
tscQueueAsyncRes(pSql);
......
......@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
taosRemoveRef(tscRefId, pTscObj);
taosRemoveRef(tscRefId, pTscObj->rid);
}
}
......
......@@ -83,13 +83,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(void *pContext);
void rpcCancelRequest(int64_t rid);
#ifdef __cplusplus
}
......
......@@ -106,13 +106,13 @@ typedef void* tsync_h;
int32_t syncInit();
void syncCleanUp();
tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *);
int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
void syncRecover(int64_t rid); // recover from other nodes:
int syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[];
......
......@@ -72,7 +72,7 @@ typedef struct {
ESyncRole role;
ESdbStatus status;
int64_t version;
void * sync;
int64_t sync;
void * wal;
SSyncCfg cfg;
int32_t numOfTables;
......@@ -212,7 +212,7 @@ static void sdbRestoreTables() {
}
void sdbUpdateMnodeRoles() {
if (tsSdbObj.sync == NULL) return;
if (tsSdbObj.sync <= 0) return;
SNodesRole roles = {0};
syncGetNodesRole(tsSdbObj.sync, &roles);
......@@ -433,7 +433,7 @@ void sdbCleanUp() {
if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync);
tsSdbObj.sync = NULL;
tsSdbObj.sync = -1;
}
if (tsSdbObj.wal) {
......
......@@ -560,7 +560,7 @@ void rpcCancelRequest(int64_t rid) {
rpcCloseConn(pContext->pConn);
taosReleaseRef(tsRpcRefId, pContext);
taosReleaseRef(tsRpcRefId, rid);
}
static void rpcFreeMsg(void *msg) {
......@@ -629,7 +629,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
// if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pConn->pContext);
taosRemoveRef(tsRpcRefId, pConn->pContext->rid);
}
}
......@@ -1110,7 +1110,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
}
// free the request message
taosRemoveRef(tsRpcRefId, pContext);
taosRemoveRef(tsRpcRefId, pContext->rid);
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
......
......@@ -141,6 +141,7 @@ typedef struct SSyncNode {
int8_t replica;
int8_t quorum;
uint32_t vgId;
int64_t rid;
void *ahandle;
int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
......
......@@ -142,14 +142,14 @@ void syncCleanUp() {
sInfo("sync module is cleaned up");
}
void *syncStart(const SSyncInfo *pInfo) {
int64_t syncStart(const SSyncInfo *pInfo) {
const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) {
sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
return -1;
}
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
......@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) {
pNode->rid = taosAddRef(tsSyncRefId, pNode);
if (pNode->rid < 0) {
syncFreeNode(pNode);
return NULL;
return -1;
}
for (int i = 0; i < pCfg->replica; ++i) {
......@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG;
syncStop(pNode);
return NULL;
syncStop(pNode->rid);
return -1;
}
nodeVersion = pInfo->version; // set the initial version
......@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno);
syncStop(pNode);
return NULL;
syncStop(pNode->rid);
return -1;
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode);
return NULL;
syncStop(pNode->rid);
return -1;
}
syncAddArbitrator(pNode);
......@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
}
return pNode;
return pNode->rid;
}
void syncStop(void *param) {
SSyncNode *pNode = param;
void syncStop(int64_t rid) {
SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId);
......@@ -245,16 +244,15 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
taosRemoveRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, rid);
}
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param;
int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
int i, j;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica);
......@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]);
syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
return 0;
}
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return 0;
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
return code;
}
void syncConfirmForward(void *param, uint64_t version, int32_t code) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
......@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
}
}
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
}
void syncRecover(void *param) {
SSyncNode *pNode = param;
void syncRecover(int64_t rid) {
SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
......@@ -393,14 +386,12 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
}
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return -1;
pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) {
......@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
pNodesRole->role[i] = pNode->peerInfo[i]->role;
}
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
return 0;
}
......@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1);
int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode);
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd);
......@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
}
taosAcquireRef(tsSyncRefId, pNode);
taosAcquireRef(tsSyncRefId, pNode->rid);
return pPeer;
}
......@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode) < 0) return;
if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return;
pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
......@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) {
}
pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, pNode->rid);
}
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
......@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
}
static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if ( ret < 0) return;
int64_t rid = (int64_t) param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
......@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex));
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
}
taosReleaseRef(tsSyncRefId, pNode);
taosReleaseRef(tsSyncRefId, rid);
}
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
......
......@@ -30,7 +30,7 @@ int dataFd = -1;
void * qhandle = NULL;
int walNum = 0;
uint64_t tversion = 0;
void * syncHandle;
int64_t syncHandle;
int role;
int nodeId;
char path[256];
......
......@@ -47,7 +47,7 @@ typedef struct {
void *rqueue;
void *wal;
void *tsdb;
void *sync;
int64_t sync;
void *events;
void *cq; // continuous query
int32_t cfgVersion;
......
......@@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {}
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif
char* vnodeStatus[] = {
......@@ -330,7 +330,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
#ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER;
#else
if (pVnode->sync == NULL) {
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode);
......@@ -589,9 +589,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// stop replication module
if (pVnode->sync) {
void *sync = pVnode->sync;
pVnode->sync = NULL;
if (pVnode->sync > 0) {
int64_t sync = pVnode->sync;
pVnode->sync = -1;
syncStop(sync);
}
......
......@@ -43,6 +43,7 @@ extern int32_t wDebugFlag;
typedef struct {
uint64_t version;
int64_t fileId;
int64_t rid;
int32_t vgId;
int32_t fd;
int32_t keep;
......
......@@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) {
return NULL;
}
if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) {
pWal->rid = taosAddRef(tsWal.refId, pWal);
if (pWal->rid < 0) {
walFreeObj(pWal);
return NULL;
}
......@@ -143,7 +144,7 @@ void walClose(void *handle) {
}
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal);
taosRemoveRef(tsWal.refId, pWal->rid);
}
static int32_t walInitObj(SWal *pWal) {
......@@ -185,7 +186,7 @@ static void walUpdateSeq() {
}
static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refId, NULL);
SWal *pWal = taosIterateRef(tsWal.refId, 0);
while (pWal) {
if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
......@@ -194,7 +195,7 @@ static void walFsyncAll() {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
}
}
pWal = taosIterateRef(tsWal.refId, pWal);
pWal = taosIterateRef(tsWal.refId, pWal->rid);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册