未验证 提交 3ff69920 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4128 from taosdata/feature/file

Feature/file
...@@ -330,6 +330,7 @@ typedef struct STscObj { ...@@ -330,6 +330,7 @@ typedef struct STscObj {
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
...@@ -348,7 +349,7 @@ typedef struct SSqlObj { ...@@ -348,7 +349,7 @@ typedef struct SSqlObj {
void *signature; void *signature;
pthread_t owner; // owner of sql object, by which it is executed pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj; STscObj *pTscObj;
void *pRpcCtx; int64_t rpcRid;
void (*fp)(); void (*fp)();
void (*fetchFp)(); void (*fetchFp)();
void *param; void *param;
......
...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
int32_t waitingDuring = tsShellActivityTimer * 500; int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
} else { } else {
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
} }
} }
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle; int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid);
int ret = taosAcquireRef(tscRefId, pObj); if (pObj == NULL) return;
if (ret < 0) {
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
return;
}
SSqlObj* pHB = pObj->pHb; SSqlObj* pHB = pObj->pHb;
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) { if (p == NULL) {
tscWarn("%p HB object has been released already", pHB); tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, pObj->rid);
return; return;
} }
...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, rid);
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.ahandle = pSql, .ahandle = pSql,
.handle = &pSql->pRpcCtx, .handle = NULL,
.code = 0 .code = 0
}; };
...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql); assert(*pSql->self == pSql);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
createHBObj(pObj); createHBObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode //launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
return 0; return 0;
} }
......
...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
registerSqlObj(pSql); registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
taosAddRef(tscRefId, pObj); pObj->rid = taosAddRef(tscRefId, pObj);
return pSql; return pSql;
} }
...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { ...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) {
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx); rpcCancelRequest(pHb->rpcRid);
pHb->pRpcCtx = NULL; pHb->rpcRid = -1;
} }
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { ...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj); taosRemoveRef(tscRefId, pObj->rid);
} }
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
assert(pSubObj->self == (SSqlObj**) p); assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) { if (pSubObj->rpcRid > 0) {
rpcCancelRequest(pSubObj->pRpcCtx); rpcCancelRequest(pSubObj->rpcRid);
pSubObj->pRpcCtx = NULL; pSubObj->rpcRid = -1;
} }
tscQueueAsyncRes(pSubObj); tscQueueAsyncRes(pSubObj);
...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL); assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
} else { } else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) { if (pSql->cmd.command < TSDB_SQL_LOCAL) {
...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) {
* reset and freed in the processMsgFromServer function, and causes the invalid * reset and freed in the processMsgFromServer function, and causes the invalid
* write problem for rpcCancelRequest. * write problem for rpcCancelRequest.
*/ */
if (pSql->pRpcCtx != NULL) { if (pSql->rpcRid > 0) {
rpcCancelRequest(pSql->pRpcCtx); rpcCancelRequest(pSql->rpcRid);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
} }
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
......
...@@ -36,7 +36,7 @@ void * tscTmr; ...@@ -36,7 +36,7 @@ void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
int tsInsertHeadSize; int tsInsertHeadSize;
int tscRefId; int tscRefId = -1;
int tscNumOfThreads; int tscNumOfThreads;
......
...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) { if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
taosRemoveRef(tscRefId, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid);
} }
} }
......
...@@ -83,13 +83,13 @@ void rpcClose(void *); ...@@ -83,13 +83,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(void *pContext); void rpcCancelRequest(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -106,13 +106,13 @@ typedef void* tsync_h; ...@@ -106,13 +106,13 @@ typedef void* tsync_h;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
tsync_h syncStart(const SSyncInfo *); int64_t syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(int64_t rid);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes: void syncRecover(int64_t rid); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *); int syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
......
...@@ -72,7 +72,7 @@ typedef struct { ...@@ -72,7 +72,7 @@ typedef struct {
ESyncRole role; ESyncRole role;
ESdbStatus status; ESdbStatus status;
int64_t version; int64_t version;
void * sync; int64_t sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
int32_t numOfTables; int32_t numOfTables;
...@@ -212,7 +212,7 @@ static void sdbRestoreTables() { ...@@ -212,7 +212,7 @@ static void sdbRestoreTables() {
} }
void sdbUpdateMnodeRoles() { void sdbUpdateMnodeRoles() {
if (tsSdbObj.sync == NULL) return; if (tsSdbObj.sync <= 0) return;
SNodesRole roles = {0}; SNodesRole roles = {0};
syncGetNodesRole(tsSdbObj.sync, &roles); syncGetNodesRole(tsSdbObj.sync, &roles);
...@@ -433,7 +433,7 @@ void sdbCleanUp() { ...@@ -433,7 +433,7 @@ void sdbCleanUp() {
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync); syncStop(tsSdbObj.sync);
tsSdbObj.sync = NULL; tsSdbObj.sync = -1;
} }
if (tsSdbObj.wal) { if (tsSdbObj.wal) {
......
...@@ -82,6 +82,7 @@ typedef struct { ...@@ -82,6 +82,7 @@ typedef struct {
int8_t oldInUse; // server EP inUse passed by app int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
...@@ -220,8 +221,7 @@ static void rpcFree(void *p) { ...@@ -220,8 +221,7 @@ static void rpcFree(void *p) {
free(p); free(p);
} }
static void rpcInit(void) { void rpcInit(void) {
tsProgressTimer = tsRpcTimer/2; tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
...@@ -230,6 +230,11 @@ static void rpcInit(void) { ...@@ -230,6 +230,11 @@ static void rpcInit(void) {
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
} }
void rpcCleanup(void) {
taosCloseRef(tsRpcRefId);
tsRpcRefId = -1;
}
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
// set the handle to pContext, so app can cancel the request // set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext; if (pMsg->handle) *((void **)pMsg->handle) = pContext;
taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return; return pContext->rid;
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { ...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return code; return code;
} }
void rpcCancelRequest(void *handle) { void rpcCancelRequest(int64_t rid) {
SRpcReqContext *pContext = handle;
int code = taosAcquireRef(tsRpcRefId, pContext); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
if (code < 0) return; if (pContext == NULL) return;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
taosReleaseRef(tsRpcRefId, pContext); taosReleaseRef(tsRpcRefId, rid);
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
// if there is an outgoing message, free it // if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) { if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL; if (pConn->pContext) pConn->pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pConn->pContext); taosRemoveRef(tsRpcRefId, pConn->pContext->rid);
} }
} }
...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
// free the request message // free the request message
taosRemoveRef(tsRpcRefId, pContext); taosRemoveRef(tsRpcRefId, pContext->rid);
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc)
tDebug("%s rpc resources are released", pRpc->label); tDebug("%s rpc resources are released", pRpc->label);
taosTFree(pRpc); taosTFree(pRpc);
int count = atomic_sub_fetch_32(&tsRpcNum, 1); atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) {
// taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
}
} }
} }
...@@ -141,6 +141,7 @@ typedef struct SSyncNode { ...@@ -141,6 +141,7 @@ typedef struct SSyncNode {
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
uint32_t vgId; uint32_t vgId;
int64_t rid;
void *ahandle; void *ahandle;
int8_t selfIndex; int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
......
...@@ -142,14 +142,14 @@ void syncCleanUp() { ...@@ -142,14 +142,14 @@ void syncCleanUp() {
sInfo("sync module is cleaned up"); sInfo("sync module is cleaned up");
} }
void *syncStart(const SSyncInfo *pInfo) { int64_t syncStart(const SSyncInfo *pInfo) {
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) { if (pNode == NULL) {
sError("no memory to allocate syncNode"); sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return -1;
} }
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum; pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode); pNode->rid = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) { if (pNode->rid < 0) {
syncFreeNode(pNode); syncFreeNode(pNode);
return NULL; return -1;
} }
for (int i = 0; i < pCfg->replica; ++i) { for (int i = 0; i < pCfg->replica; ++i) {
...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG; terrno = TSDB_CODE_SYN_INVALID_CONFIG;
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
nodeVersion = pInfo->version; // set the initial version nodeVersion = pInfo->version; // set the initial version
...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
if (pNode->pFwdTimer == NULL) { if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId); sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
return pNode; return pNode->rid;
} }
void syncStop(void *param) { void syncStop(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
...@@ -245,16 +244,15 @@ void syncStop(void *param) { ...@@ -245,16 +244,15 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, pNode); taosRemoveRef(tsSyncRefId, rid);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param;
int i, j; int i, j;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica); pNode->replica);
...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]); syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return 0;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) { void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
} }
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
void syncRecover(void *param) { void syncRecover(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK // to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work // if take this node to unsync state, the whole system may not work
...@@ -393,14 +386,12 @@ void syncRecover(void *param) { ...@@ -393,14 +386,12 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return -1;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { ...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); ...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1);
int syncDecPeerRef(SSyncPeer *pPeer) { int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id); sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd); taosTFree(pPeer->watchFd);
...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
} }
taosAcquireRef(tsSyncRefId, pNode); taosAcquireRef(tsSyncRefId, pNode->rid);
return pPeer; return pPeer;
} }
...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, pNode->rid);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param; int64_t rid = (int64_t) param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
int ret = taosAcquireRef(tsSyncRefId, pNode); if (pNode == NULL) return;
if ( ret < 0) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
......
...@@ -30,7 +30,7 @@ int dataFd = -1; ...@@ -30,7 +30,7 @@ int dataFd = -1;
void * qhandle = NULL; void * qhandle = NULL;
int walNum = 0; int walNum = 0;
uint64_t tversion = 0; uint64_t tversion = 0;
void * syncHandle; int64_t syncHandle;
int role; int role;
int nodeId; int nodeId;
char path[256]; char path[256];
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TFILE_H
#define TDENGINE_TFILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <unistd.h>
// init taos file module
int32_t tfinit();
// clean up taos file module
void tfcleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count);
int64_t tfread(int64_t tfd, void *buf, int64_t count);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H
...@@ -21,38 +21,48 @@ ...@@ -21,38 +21,48 @@
extern "C" { extern "C" {
#endif #endif
// open an instance, return refId which will be used by other APIs // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
int taosOpenRef(int max, void (*fp)(void *)); // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int taosOpenRef(int max, void (*fp)(void *));
// close the Ref instance // close the reference set, refId is the return value by taosOpenRef
void taosCloseRef(int refId); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosCloseRef(int refId);
// add ref, p is the pointer to resource or pointer ID // add ref, p is the pointer to resource or pointer ID
int taosAddRef(int refId, void *p); // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
#define taosRemoveRef taosReleaseRef int64_t taosAddRef(int refId, void *p);
// acquire ref, p is the pointer to resource or pointer ID // remove ref, rid is the reference ID returned by taosAddRef
int taosAcquireRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosRemoveRef(int rsetId, int64_t rid);
// release ref, p is the pointer to resource or pinter ID // acquire ref, rid is the reference ID returned by taosAddRef
void taosReleaseRef(int refId, void *p); // return the resource p. On error, NULL is returned, and terrno is set appropriately
void *taosAcquireRef(int rsetId, int64_t rid);
// return the first if p is null, otherwise return the next after p // release ref, rid is the reference ID returned by taosAddRef
void *taosIterateRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosReleaseRef(int rsetId, int64_t rid);
// return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system // return the number of references in system
int taosListRef(); int taosListRef();
/* sample code to iterate the refs /* sample code to iterate the refs
void demoIterateRefs(int refId) { void demoIterateRefs(int rsetId) {
void *p = taosIterateRef(refId, NULL); void *p = taosIterateRef(refId, 0);
while (p) { while (p) {
// process P // process P
// get the rid from p
p = taosIterateRef(refId, p); p = taosIterateRef(rsetId, rid);
} }
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "tutil.h"
#include "tref.h"
static int32_t tsFileRsetId = -1;
static void taosCloseFile(void *p) {
close((int32_t)(uintptr_t)p);
}
int32_t tfinit() {
tsFileRsetId = taosOpenRef(2000, taosCloseFile);
return tsFileRsetId;
}
void tfcleanup() {
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1;
}
int64_t tfopen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
void *p = (void *)(int64_t)fd;
int64_t rid = taosAddRef(tsFileRsetId, p);
if (rid < 0) close(fd);
return rid;
}
int64_t tfclose(int64_t tfd) {
return taosRemoveRef(tsFileRsetId, tfd);
}
int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosWrite(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
int64_t tfread(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosRead(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
...@@ -24,19 +24,22 @@ ...@@ -24,19 +24,22 @@
#define TSDB_REF_STATE_DELETED 2 #define TSDB_REF_STATE_DELETED 2
typedef struct SRefNode { typedef struct SRefNode {
struct SRefNode *prev; struct SRefNode *prev; // previous node
struct SRefNode *next; struct SRefNode *next; // next node
void *p; void *p; // pointer to resource protected,
int32_t count; int64_t rid; // reference ID
int32_t count; // number of references
int removed; // 1: removed
} SRefNode; } SRefNode;
typedef struct { typedef struct {
SRefNode **nodeList; SRefNode **nodeList; // array of SRefNode linked list
int state; // 0: empty, 1: active; 2: deleted int state; // 0: empty, 1: active; 2: deleted
int refId; int rsetId; // refSet ID, global unique
int max; int64_t rid; // increase by one for each new reference
int32_t count; // total number of SRefNodes in this set int max; // mod
int64_t *lockedBy; int32_t count; // total number of SRefNodes in this set
int64_t *lockedBy;
void (*fp)(void *); void (*fp)(void *);
} SRefSet; } SRefSet;
...@@ -47,54 +50,58 @@ static int tsRefSetNum = 0; ...@@ -47,54 +50,58 @@ static int tsRefSetNum = 0;
static int tsNextId = 0; static int tsNextId = 0;
static void taosInitRefModule(void); static void taosInitRefModule(void);
static int taosHashRef(SRefSet *pSet, void *p);
static void taosLockList(int64_t *lockedBy); static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy);
static void taosIncRefCount(SRefSet *pSet); static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRefCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int taosDecRefCount(int rsetId, int64_t rid, int remove);
int taosOpenRef(int max, void (*fp)(void *)) int taosOpenRef(int max, void (*fp)(void *))
{ {
SRefNode **nodeList; SRefNode **nodeList;
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;
int i, refId; int i, rsetId;
pthread_once(&tsRefModuleInit, taosInitRefModule); pthread_once(&tsRefModuleInit, taosInitRefModule);
nodeList = calloc(sizeof(SRefNode *), (size_t)max); nodeList = calloc(sizeof(SRefNode *), (size_t)max);
if (nodeList == NULL) { if (nodeList == NULL) {
return TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
} }
lockedBy = calloc(sizeof(int64_t), (size_t)max); lockedBy = calloc(sizeof(int64_t), (size_t)max);
if (lockedBy == NULL) { if (lockedBy == NULL) {
free(nodeList); free(nodeList);
return TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
} }
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
for (i = 0; i < TSDB_REF_OBJECTS; ++i) { for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
} }
if (i < TSDB_REF_OBJECTS) { if (i < TSDB_REF_OBJECTS) {
refId = tsNextId; rsetId = tsNextId;
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet);
pSet->max = max; pSet->max = max;
pSet->nodeList = nodeList; pSet->nodeList = nodeList;
pSet->lockedBy = lockedBy; pSet->lockedBy = lockedBy;
pSet->fp = fp; pSet->fp = fp;
pSet->rid = 1;
pSet->rsetId = rsetId;
pSet->state = TSDB_REF_STATE_ACTIVE; pSet->state = TSDB_REF_STATE_ACTIVE;
pSet->refId = refId; taosIncRsetCount(pSet);
tsRefSetNum++; tsRefSetNum++;
uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
} else { } else {
refId = TSDB_CODE_REF_FULL; rsetId = TSDB_CODE_REF_FULL;
free (nodeList); free (nodeList);
free (lockedBy); free (lockedBy);
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
...@@ -102,121 +109,128 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -102,121 +109,128 @@ int taosOpenRef(int max, void (*fp)(void *))
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
return refId; return rsetId;
} }
void taosCloseRef(int refId) int taosCloseRef(int rsetId)
{ {
SRefSet *pSet; SRefSet *pSet;
int deleted = 0; int deleted = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d is invalid, out of range", refId); uTrace("rsetId:%d is invalid, out of range", rsetId);
return; terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
if (pSet->state == TSDB_REF_STATE_ACTIVE) { if (pSet->state == TSDB_REF_STATE_ACTIVE) {
pSet->state = TSDB_REF_STATE_DELETED; pSet->state = TSDB_REF_STATE_DELETED;
deleted = 1; deleted = 1;
uTrace("refId:%d is closed, count:%d", refId, pSet->count); uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
} else { } else {
uTrace("refId:%d is already closed, count:%d", refId, pSet->count); uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
if (deleted) taosDecRefCount(pSet); if (deleted) taosDecRsetCount(pSet);
return 0;
} }
int taosAddRef(int refId, void *p) int64_t taosAddRef(int rsetId, void *p)
{ {
int hash; int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
int64_t rid = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p);
return TSDB_CODE_REF_INVALID_ID; terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet); taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
taosDecRefCount(pSet); taosDecRsetCount(pSet);
uTrace("refId:%d p:%p failed to add, not active", refId, p); uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
return TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return -1;
} }
int code = 0; pNode = calloc(sizeof(SRefNode), 1);
hash = taosHashRef(pSet, p); if (pNode == NULL) {
terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
}
rid = atomic_add_fetch_64(&pSet->rid, 1);
hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode->p = p;
while (pNode) { pNode->rid = rid;
if (pNode->p == p) pNode->count = 1;
break;
pNode = pNode->next; pNode->prev = NULL;
} pNode->next = pSet->nodeList[hash];
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
if (pNode) { pSet->nodeList[hash] = pNode;
code = TSDB_CODE_REF_ALREADY_EXIST;
uTrace("refId:%d p:%p is already there, faild to add", refId, p);
} else {
pNode = calloc(sizeof(SRefNode), 1);
if (pNode) {
pNode->p = p;
pNode->count = 1;
pNode->prev = 0;
pNode->next = pSet->nodeList[hash];
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
pSet->nodeList[hash] = pNode;
uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode);
} else {
code = TSDB_CODE_REF_NO_MEMORY;
uTrace("refId:%d p:%p is not added, since no memory", refId, p);
}
}
if (code < 0) taosDecRefCount(pSet); uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
return code; return rid;
}
int taosRemoveRef(int rsetId, int64_t rid)
{
return taosDecRefCount(rsetId, rid, 1);
} }
int taosAcquireRef(int refId, void *p) // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash, code = 0; int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
void *p = NULL;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
return TSDB_CODE_REF_INVALID_ID; terrno = TSDB_CODE_REF_INVALID_ID;
return NULL;
} }
pSet = tsRefSetList + refId; if (rid <= 0) {
taosIncRefCount(pSet); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return NULL;
}
pSet = tsRefSetList + rsetId;
taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to acquire, not active", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return NULL;
} }
hash = taosHashRef(pSet, p); hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->p == p) { if (pNode->rid == rid) {
break; break;
} }
...@@ -224,117 +238,76 @@ int taosAcquireRef(int refId, void *p) ...@@ -224,117 +238,76 @@ int taosAcquireRef(int refId, void *p)
} }
if (pNode) { if (pNode) {
pNode->count++; if (pNode->removed == 0) {
uTrace("refId:%d p:%p is acquired", refId, p); pNode->count++;
p = pNode->p;
uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
} else {
terrno = TSDB_CODE_REF_NOT_EXIST;
uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid);
}
} else { } else {
code = TSDB_CODE_REF_NOT_EXIST; terrno = TSDB_CODE_REF_NOT_EXIST;
uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
} }
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return code; return p;
} }
void taosReleaseRef(int refId, void *p) int taosReleaseRef(int rsetId, int64_t rid)
{ {
int hash; return taosDecRefCount(rsetId, rid, 0);
SRefNode *pNode;
SRefSet *pSet;
int released = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to release, refId not valid", refId, p);
return;
}
pSet = tsRefSetList + refId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
return;
}
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p)
break;
pNode = pNode->next;
}
if (pNode) {
pNode->count--;
if (pNode->count == 0) {
if ( pNode->prev ) {
pNode->prev->next = pNode->next;
} else {
pSet->nodeList[hash] = pNode->next;
}
if ( pNode->next ) {
pNode->next->prev = pNode->prev;
}
(*pSet->fp)(pNode->p);
free(pNode);
released = 1;
uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode);
} else {
uTrace("refId:%d p:%p is released", refId, p);
}
} else {
uTrace("refId:%d p:%p is not there, failed to release", refId, p);
}
taosUnlockList(pSet->lockedBy+hash);
if (released) taosDecRefCount(pSet);
} }
// if p is NULL, return the first p in hash list, otherwise, return the next after p // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int refId, void *p) { void *taosIterateRef(int rsetId, int64_t rid) {
SRefNode *pNode = NULL; SRefNode *pNode = NULL;
SRefSet *pSet; SRefSet *pSet;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid);
terrno = TSDB_CODE_REF_INVALID_ID;
return NULL;
}
if (rid <= 0) {
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return NULL; return NULL;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet); taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to iterate, not active", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
taosDecRefCount(pSet); terrno = TSDB_CODE_REF_ID_REMOVED;
taosDecRsetCount(pSet);
return NULL; return NULL;
} }
int hash = 0; int hash = 0;
if (p) { if (rid > 0) {
hash = taosHashRef(pSet, p); hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->p == p) break; if (pNode->rid == rid) break;
pNode = pNode->next; pNode = pNode->next;
} }
if (pNode == NULL) { if (pNode == NULL) {
uError("refId:%d p:%p not there, quit", refId, p); uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
return NULL; return NULL;
} }
// p is there // rid is there
pNode = pNode->next; pNode = pNode->next;
if (pNode == NULL) { if (pNode == NULL) {
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
...@@ -356,14 +329,14 @@ void *taosIterateRef(int refId, void *p) { ...@@ -356,14 +329,14 @@ void *taosIterateRef(int refId, void *p) {
pNode->count++; // acquire it pNode->count++; // acquire it
newP = pNode->p; newP = pNode->p;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
uTrace("refId:%d p:%p is returned", refId, p); uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
} else { } else {
uTrace("refId:%d p:%p the list is over", refId, p); uTrace("rsetId:%d the list is over", rsetId);
} }
if (p) taosReleaseRef(refId, p); // release the current one if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return newP; return newP;
} }
...@@ -381,13 +354,13 @@ int taosListRef() { ...@@ -381,13 +354,13 @@ int taosListRef() {
if (pSet->state == TSDB_REF_STATE_EMPTY) if (pSet->state == TSDB_REF_STATE_EMPTY)
continue; continue;
uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
for (int j=0; j < pSet->max; ++j) { for (int j=0; j < pSet->max; ++j) {
pNode = pSet->nodeList[j]; pNode = pSet->nodeList[j];
while (pNode) { while (pNode) {
uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
pNode = pNode->next; pNode = pNode->next;
num++; num++;
} }
...@@ -399,21 +372,78 @@ int taosListRef() { ...@@ -399,21 +372,78 @@ int taosListRef() {
return num; return num;
} }
static int taosHashRef(SRefSet *pSet, void *p) static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
{ int hash;
int hash = 0; SRefSet *pSet;
int64_t v = (int64_t)p; SRefNode *pNode;
int released = 0;
int code = 0;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
}
if (rid <= 0) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return -1;
}
pSet = tsRefSetList + rsetId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid);
terrno = TSDB_CODE_REF_ID_REMOVED;
return -1;
}
for (int i = 0; i < sizeof(v); ++i) { hash = rid % pSet->max;
hash += (int)(v & 0xFFFF); taosLockList(pSet->lockedBy+hash);
v = v >> 16;
i = i + 2; pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->rid == rid)
break;
pNode = pNode->next;
} }
hash = hash % pSet->max; if (pNode) {
pNode->count--;
if (remove) pNode->removed = 1;
return hash; if (pNode->count <= 0) {
} if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pSet->nodeList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
(*pSet->fp)(pNode->p);
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
free(pNode);
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
code = -1;
}
taosUnlockList(pSet->lockedBy+hash);
if (released) taosDecRsetCount(pSet);
return code;
}
static void taosLockList(int64_t *lockedBy) { static void taosLockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
...@@ -436,14 +466,14 @@ static void taosInitRefModule(void) { ...@@ -436,14 +466,14 @@ static void taosInitRefModule(void) {
pthread_mutex_init(&tsRefMutex, NULL); pthread_mutex_init(&tsRefMutex, NULL);
} }
static void taosIncRefCount(SRefSet *pSet) { static void taosIncRsetCount(SRefSet *pSet) {
atomic_add_fetch_32(&pSet->count, 1); atomic_add_fetch_32(&pSet->count, 1);
uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count);
} }
static void taosDecRefCount(SRefSet *pSet) { static void taosDecRsetCount(SRefSet *pSet) {
int32_t count = atomic_sub_fetch_32(&pSet->count, 1); int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count);
if (count > 0) return; if (count > 0) return;
...@@ -458,7 +488,7 @@ static void taosDecRefCount(SRefSet *pSet) { ...@@ -458,7 +488,7 @@ static void taosDecRefCount(SRefSet *pSet) {
taosTFree(pSet->lockedBy); taosTFree(pSet->lockedBy);
tsRefSetNum--; tsRefSetNum--;
uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
......
...@@ -11,106 +11,119 @@ ...@@ -11,106 +11,119 @@
#include "tulog.h" #include "tulog.h"
typedef struct { typedef struct {
int refNum; int refNum;
int steps; int steps;
int refId; int rsetId;
void **p; int64_t rid;
void **p;
} SRefSpace; } SRefSpace;
void iterateRefs(int refId) { void iterateRefs(int rsetId) {
int count = 0; int count = 0;
void *p = taosIterateRef(refId, NULL); void *p = taosIterateRef(rsetId, NULL);
while (p) { while (p) {
// process P // process P
count++; count++;
p = taosIterateRef(refId, p); p = taosIterateRef(rsetId, p);
} }
printf(" %d ", count); printf(" %d ", count);
} }
void *takeRefActions(void *param) { void *addRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
int code, id; int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("s"); printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
code = taosAddRef(pSpace->refId, pSpace->p[id]); if (pSpace->rid[id] <= 0) {
usleep(1); pSpace->p[id] = malloc(128);
pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]);
id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
if (code >= 0) {
usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]);
} }
usleep(100);
}
return NULL;
}
void *removeRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) {
printf("d");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
taosRemoveRef(pSpace->refId, pSpace->p[id]); if (pSpace->rid[id] > 0) {
usleep(id %5 + 1); code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]);
if (code == 0) pSpace->rid[id] = 0;
}
usleep(100);
}
return NULL;
}
void *acquireRelease(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) {
printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]); code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]);
if (code >= 0) { if (code >= 0) {
usleep(id % 5 + 1); usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]); taosReleaseRef(pSpace->rsetId, pSpace->p[id]);
} }
id = random() % pSpace->refNum;
iterateRefs(id);
} }
for (int i=0; i < pSpace->refNum; ++i) {
taosRemoveRef(pSpace->refId, pSpace->p[i]);
}
//uInfo("refId:%d thread exits", pSpace->refId);
return NULL; return NULL;
} }
void myfree(void *p) { void myfree(void *p) {
return; free(p);
} }
void *openRefSpace(void *param) { void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
printf("c"); printf("c");
pSpace->refId = taosOpenRef(50, myfree); pSpace->rsetId = taosOpenRef(50, myfree);
if (pSpace->refId < 0) { if (pSpace->rsetId < 0) {
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId));
return NULL; return NULL;
} }
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
for (int i=0; i<pSpace->refNum; ++i) {
pSpace->p[i] = (void *) malloc(128);
}
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_t thread1, thread2, thread3; pthread_t thread1, thread2, thread3;
pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace));
pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace));
pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace));
pthread_join(thread1, NULL); pthread_join(thread1, NULL);
pthread_join(thread2, NULL); pthread_join(thread2, NULL);
pthread_join(thread3, NULL); pthread_join(thread3, NULL);
taosCloseRef(pSpace->refId);
for (int i=0; i<pSpace->refNum; ++i) { for (int i=0; i<pSpace->refNum; ++i) {
free(pSpace->p[i]); taosRemoveRef(pSpace->rsetId, pSpace->rid[i]);
} }
uInfo("refId:%d main thread exit", pSpace->refId); taosCloseRef(pSpace->rsetId);
uInfo("rsetId:%d main thread exit", pSpace->rsetId);
free(pSpace->p); free(pSpace->p);
pSpace->p = NULL; pSpace->p = NULL;
...@@ -140,7 +153,7 @@ int main(int argc, char *argv[]) { ...@@ -140,7 +153,7 @@ int main(int argc, char *argv[]) {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-n]: number of references, default: %d\n", refNum); printf(" [-n]: number of references, default: %d\n", refNum);
printf(" [-s]: steps to run for each reference, default: %d\n", steps); printf(" [-s]: steps to run for each reference, default: %d\n", steps);
printf(" [-t]: number of refIds running in parallel, default: %d\n", threads); printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads);
printf(" [-l]: number of loops, default: %d\n", loops); printf(" [-l]: number of loops, default: %d\n", loops);
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag); printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
exit(0); exit(0);
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
void *rqueue; void *rqueue;
void *wal; void *wal;
void *tsdb; void *tsdb;
void *sync; int64_t sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t cfgVersion; int32_t cfgVersion;
......
...@@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds); ...@@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {} void syncStop(int64_t rid) {}
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif #endif
char* vnodeStatus[] = { char* vnodeStatus[] = {
...@@ -331,7 +331,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -331,7 +331,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
#ifndef _SYNC #ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER; pVnode->role = TAOS_SYNC_ROLE_MASTER;
#else #else
if (pVnode->sync == NULL) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno)); tstrerror(terrno));
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -561,9 +561,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -561,9 +561,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
} }
// stop replication module // stop replication module
if (pVnode->sync) { if (pVnode->sync > 0) {
void *sync = pVnode->sync; int64_t sync = pVnode->sync;
pVnode->sync = NULL; pVnode->sync = -1;
syncStop(sync); syncStop(sync);
} }
......
...@@ -43,6 +43,7 @@ extern int32_t wDebugFlag; ...@@ -43,6 +43,7 @@ extern int32_t wDebugFlag;
typedef struct { typedef struct {
uint64_t version; uint64_t version;
int64_t fileId; int64_t fileId;
int64_t rid;
int32_t vgId; int32_t vgId;
int32_t fd; int32_t fd;
int32_t keep; int32_t keep;
......
...@@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { pWal->rid = taosAddRef(tsWal.refId, pWal);
if (pWal->rid < 0) {
walFreeObj(pWal); walFreeObj(pWal);
return NULL; return NULL;
} }
...@@ -143,7 +144,7 @@ void walClose(void *handle) { ...@@ -143,7 +144,7 @@ void walClose(void *handle) {
} }
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal); taosRemoveRef(tsWal.refId, pWal->rid);
} }
static int32_t walInitObj(SWal *pWal) { static int32_t walInitObj(SWal *pWal) {
...@@ -185,7 +186,7 @@ static void walUpdateSeq() { ...@@ -185,7 +186,7 @@ static void walUpdateSeq() {
} }
static void walFsyncAll() { static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refId, NULL); SWal *pWal = taosIterateRef(tsWal.refId, 0);
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
...@@ -194,7 +195,7 @@ static void walFsyncAll() { ...@@ -194,7 +195,7 @@ static void walFsyncAll() {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refId, pWal); pWal = taosIterateRef(tsWal.refId, pWal->rid);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册