提交 4078e7b3 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

...@@ -90,7 +90,6 @@ pipeline { ...@@ -90,7 +90,6 @@ pipeline {
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh ''' sh '''
cd ${WKC}/tests/pytest cd ${WKC}/tests/pytest
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
./handle_crash_gen_val_log.sh ./handle_crash_gen_val_log.sh
''' '''
} }
......
...@@ -330,6 +330,7 @@ typedef struct STscObj { ...@@ -330,6 +330,7 @@ typedef struct STscObj {
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId; uint32_t connId;
uint64_t rid; // ref ID returned by taosAddRef
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
...@@ -348,7 +349,7 @@ typedef struct SSqlObj { ...@@ -348,7 +349,7 @@ typedef struct SSqlObj {
void *signature; void *signature;
pthread_t owner; // owner of sql object, by which it is executed pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj; STscObj *pTscObj;
void *pRpcCtx; int64_t rpcRid;
void (*fp)(); void (*fp)();
void (*fetchFp)(); void (*fetchFp)();
void *param; void *param;
......
...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
int32_t waitingDuring = tsShellActivityTimer * 500; int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer);
} else { } else {
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
} }
} }
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle; int64_t rid = (int64_t) handle;
STscObj *pObj = taosAcquireRef(tscRefId, rid);
int ret = taosAcquireRef(tscRefId, pObj); if (pObj == NULL) return;
if (ret < 0) {
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
return;
}
SSqlObj* pHB = pObj->pHb; SSqlObj* pHB = pObj->pHb;
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) { if (p == NULL) {
tscWarn("%p HB object has been released already", pHB); tscWarn("%p HB object has been released already", pHB);
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, pObj->rid);
return; return;
} }
...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
taosReleaseRef(tscRefId, pObj); taosReleaseRef(tscRefId, rid);
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.ahandle = pSql, .ahandle = pSql,
.handle = &pSql->pRpcCtx, .handle = NULL,
.code = 0 .code = 0
}; };
...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql); assert(*pSql->self == pSql);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
if (pObj->signature != pObj) { if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
createHBObj(pObj); createHBObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode //launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
return 0; return 0;
} }
......
...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
registerSqlObj(pSql); registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
taosAddRef(tscRefId, pObj); pObj->rid = taosAddRef(tscRefId, pObj);
return pSql; return pSql;
} }
...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { ...@@ -279,9 +279,9 @@ void taos_close(TAOS *taos) {
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx); rpcCancelRequest(pHb->rpcRid);
pHb->pRpcCtx = NULL; pHb->rpcRid = -1;
} }
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { ...@@ -298,7 +298,7 @@ void taos_close(TAOS *taos) {
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj); taosRemoveRef(tscRefId, pObj->rid);
} }
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
assert(pSubObj->self == (SSqlObj**) p); assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) { if (pSubObj->rpcRid > 0) {
rpcCancelRequest(pSubObj->pRpcCtx); rpcCancelRequest(pSubObj->rpcRid);
pSubObj->pRpcCtx = NULL; pSubObj->rpcRid = -1;
} }
tscQueueAsyncRes(pSubObj); tscQueueAsyncRes(pSubObj);
...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL); assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
} else { } else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) { if (pSql->cmd.command < TSDB_SQL_LOCAL) {
...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) {
* reset and freed in the processMsgFromServer function, and causes the invalid * reset and freed in the processMsgFromServer function, and causes the invalid
* write problem for rpcCancelRequest. * write problem for rpcCancelRequest.
*/ */
if (pSql->pRpcCtx != NULL) { if (pSql->rpcRid > 0) {
rpcCancelRequest(pSql->pRpcCtx); rpcCancelRequest(pSql->rpcRid);
pSql->pRpcCtx = NULL; pSql->rpcRid = -1;
} }
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
......
...@@ -36,7 +36,7 @@ void * tscTmr; ...@@ -36,7 +36,7 @@ void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
int tsInsertHeadSize; int tsInsertHeadSize;
int tscRefId; int tscRefId = -1;
int tscNumOfThreads; int tscNumOfThreads;
......
...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) { if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
taosRemoveRef(tscRefId, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid);
} }
} }
......
...@@ -83,13 +83,13 @@ void rpcClose(void *); ...@@ -83,13 +83,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(void *pContext); void rpcCancelRequest(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -106,13 +106,13 @@ typedef void* tsync_h; ...@@ -106,13 +106,13 @@ typedef void* tsync_h;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
tsync_h syncStart(const SSyncInfo *); int64_t syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(int64_t rid);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes: void syncRecover(int64_t rid); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *); int syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
......
...@@ -72,7 +72,7 @@ typedef struct { ...@@ -72,7 +72,7 @@ typedef struct {
ESyncRole role; ESyncRole role;
ESdbStatus status; ESdbStatus status;
int64_t version; int64_t version;
void * sync; int64_t sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
int32_t numOfTables; int32_t numOfTables;
...@@ -212,7 +212,7 @@ static void sdbRestoreTables() { ...@@ -212,7 +212,7 @@ static void sdbRestoreTables() {
} }
void sdbUpdateMnodeRoles() { void sdbUpdateMnodeRoles() {
if (tsSdbObj.sync == NULL) return; if (tsSdbObj.sync <= 0) return;
SNodesRole roles = {0}; SNodesRole roles = {0};
syncGetNodesRole(tsSdbObj.sync, &roles); syncGetNodesRole(tsSdbObj.sync, &roles);
...@@ -433,7 +433,7 @@ void sdbCleanUp() { ...@@ -433,7 +433,7 @@ void sdbCleanUp() {
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync); syncStop(tsSdbObj.sync);
tsSdbObj.sync = NULL; tsSdbObj.sync = -1;
} }
if (tsSdbObj.wal) { if (tsSdbObj.wal) {
......
...@@ -82,6 +82,7 @@ typedef struct { ...@@ -82,6 +82,7 @@ typedef struct {
int8_t oldInUse; // server EP inUse passed by app int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SRpcEpSet *pSet; // for synchronous API
...@@ -220,8 +221,7 @@ static void rpcFree(void *p) { ...@@ -220,8 +221,7 @@ static void rpcFree(void *p) {
free(p); free(p);
} }
static void rpcInit(void) { void rpcInit(void) {
tsProgressTimer = tsRpcTimer/2; tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
...@@ -230,6 +230,11 @@ static void rpcInit(void) { ...@@ -230,6 +230,11 @@ static void rpcInit(void) {
tsRpcRefId = taosOpenRef(200, rpcFree); tsRpcRefId = taosOpenRef(200, rpcFree);
} }
void rpcCleanup(void) {
taosCloseRef(tsRpcRefId);
tsRpcRefId = -1;
}
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
// set the handle to pContext, so app can cancel the request // set the handle to pContext, so app can cancel the request
if (pMsg->handle) *((void **)pMsg->handle) = pContext; if (pMsg->handle) *((void **)pMsg->handle) = pContext;
taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return; return pContext->rid;
} }
void rpcSendResponse(const SRpcMsg *pRsp) { void rpcSendResponse(const SRpcMsg *pRsp) {
...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { ...@@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return code; return code;
} }
void rpcCancelRequest(void *handle) { void rpcCancelRequest(int64_t rid) {
SRpcReqContext *pContext = handle;
int code = taosAcquireRef(tsRpcRefId, pContext); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
if (code < 0) return; if (pContext == NULL) return;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
taosReleaseRef(tsRpcRefId, pContext); taosReleaseRef(tsRpcRefId, rid);
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
// if there is an outgoing message, free it // if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) { if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL; if (pConn->pContext) pConn->pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pConn->pContext); taosRemoveRef(tsRpcRefId, pConn->pContext->rid);
} }
} }
...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
// free the request message // free the request message
taosRemoveRef(tsRpcRefId, pContext); taosRemoveRef(tsRpcRefId, pContext->rid);
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc)
tDebug("%s rpc resources are released", pRpc->label); tDebug("%s rpc resources are released", pRpc->label);
taosTFree(pRpc); taosTFree(pRpc);
int count = atomic_sub_fetch_32(&tsRpcNum, 1); atomic_sub_fetch_32(&tsRpcNum, 1);
if (count == 0) {
// taosCloseRef(tsRpcRefId);
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
}
} }
} }
...@@ -141,6 +141,7 @@ typedef struct SSyncNode { ...@@ -141,6 +141,7 @@ typedef struct SSyncNode {
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
uint32_t vgId; uint32_t vgId;
int64_t rid;
void *ahandle; void *ahandle;
int8_t selfIndex; int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
......
...@@ -142,14 +142,14 @@ void syncCleanUp() { ...@@ -142,14 +142,14 @@ void syncCleanUp() {
sInfo("sync module is cleaned up"); sInfo("sync module is cleaned up");
} }
void *syncStart(const SSyncInfo *pInfo) { int64_t syncStart(const SSyncInfo *pInfo) {
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
if (pNode == NULL) { if (pNode == NULL) {
sError("no memory to allocate syncNode"); sError("no memory to allocate syncNode");
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return -1;
} }
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum; pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode); pNode->rid = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) { if (pNode->rid < 0) {
syncFreeNode(pNode); syncFreeNode(pNode);
return NULL; return -1;
} }
for (int i = 0; i < pCfg->replica; ++i) { for (int i = 0; i < pCfg->replica; ++i) {
...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG; terrno = TSDB_CODE_SYN_INVALID_CONFIG;
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
nodeVersion = pInfo->version; // set the initial version nodeVersion = pInfo->version; // set the initial version
...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) {
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
if (pNode->pFwdTimer == NULL) { if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId); sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode); syncStop(pNode->rid);
return NULL; return -1;
} }
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
return pNode; return pNode->rid;
} }
void syncStop(void *param) { void syncStop(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
...@@ -245,16 +244,15 @@ void syncStop(void *param) { ...@@ -245,16 +244,15 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, pNode); taosRemoveRef(tsSyncRefId, rid);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param;
int i, j; int i, j;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica); pNode->replica);
...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]); syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return 0;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) { void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
} }
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
void syncRecover(void *param) { void syncRecover(int64_t rid) {
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (ret < 0) return; if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK // to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work // if take this node to unsync state, the whole system may not work
...@@ -393,14 +386,12 @@ void syncRecover(void *param) { ...@@ -393,14 +386,12 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return -1;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { ...@@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); ...@@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1);
int syncDecPeerRef(SSyncPeer *pPeer) { int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id); sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd); taosTFree(pPeer->watchFd);
...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
} }
taosAcquireRef(tsSyncRefId, pNode); taosAcquireRef(tsSyncRefId, pNode->rid);
return pPeer; return pPeer;
} }
...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, pNode->rid);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param; int64_t rid = (int64_t) param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
int ret = taosAcquireRef(tsSyncRefId, pNode); if (pNode == NULL) return;
if ( ret < 0) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl);
} }
taosReleaseRef(tsSyncRefId, pNode); taosReleaseRef(tsSyncRefId, rid);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
......
...@@ -30,7 +30,7 @@ int dataFd = -1; ...@@ -30,7 +30,7 @@ int dataFd = -1;
void * qhandle = NULL; void * qhandle = NULL;
int walNum = 0; int walNum = 0;
uint64_t tversion = 0; uint64_t tversion = 0;
void * syncHandle; int64_t syncHandle;
int role; int role;
int nodeId; int nodeId;
char path[256]; char path[256];
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TFILE_H
#define TDENGINE_TFILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <unistd.h>
// init taos file module
int32_t tfinit();
// clean up taos file module
void tfcleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count);
int64_t tfread(int64_t tfd, void *buf, int64_t count);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H
...@@ -21,38 +21,48 @@ ...@@ -21,38 +21,48 @@
extern "C" { extern "C" {
#endif #endif
// open an instance, return refId which will be used by other APIs // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
int taosOpenRef(int max, void (*fp)(void *)); // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int taosOpenRef(int max, void (*fp)(void *));
// close the Ref instance // close the reference set, refId is the return value by taosOpenRef
void taosCloseRef(int refId); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosCloseRef(int refId);
// add ref, p is the pointer to resource or pointer ID // add ref, p is the pointer to resource or pointer ID
int taosAddRef(int refId, void *p); // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
#define taosRemoveRef taosReleaseRef int64_t taosAddRef(int refId, void *p);
// acquire ref, p is the pointer to resource or pointer ID // remove ref, rid is the reference ID returned by taosAddRef
int taosAcquireRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosRemoveRef(int rsetId, int64_t rid);
// release ref, p is the pointer to resource or pinter ID // acquire ref, rid is the reference ID returned by taosAddRef
void taosReleaseRef(int refId, void *p); // return the resource p. On error, NULL is returned, and terrno is set appropriately
void *taosAcquireRef(int rsetId, int64_t rid);
// return the first if p is null, otherwise return the next after p // release ref, rid is the reference ID returned by taosAddRef
void *taosIterateRef(int refId, void *p); // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosReleaseRef(int rsetId, int64_t rid);
// return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system // return the number of references in system
int taosListRef(); int taosListRef();
/* sample code to iterate the refs /* sample code to iterate the refs
void demoIterateRefs(int refId) { void demoIterateRefs(int rsetId) {
void *p = taosIterateRef(refId, NULL); void *p = taosIterateRef(refId, 0);
while (p) { while (p) {
// process P // process P
// get the rid from p
p = taosIterateRef(refId, p); p = taosIterateRef(rsetId, rid);
} }
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "tutil.h"
#include "tref.h"
static int32_t tsFileRsetId = -1;
static void taosCloseFile(void *p) {
close((int32_t)(uintptr_t)p);
}
int32_t tfinit() {
tsFileRsetId = taosOpenRef(2000, taosCloseFile);
return tsFileRsetId;
}
void tfcleanup() {
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1;
}
int64_t tfopen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
void *p = (void *)(int64_t)fd;
int64_t rid = taosAddRef(tsFileRsetId, p);
if (rid < 0) close(fd);
return rid;
}
int64_t tfclose(int64_t tfd) {
return taosRemoveRef(tsFileRsetId, tfd);
}
int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosWrite(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
int64_t tfread(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosRead(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
...@@ -24,19 +24,22 @@ ...@@ -24,19 +24,22 @@
#define TSDB_REF_STATE_DELETED 2 #define TSDB_REF_STATE_DELETED 2
typedef struct SRefNode { typedef struct SRefNode {
struct SRefNode *prev; struct SRefNode *prev; // previous node
struct SRefNode *next; struct SRefNode *next; // next node
void *p; void *p; // pointer to resource protected,
int32_t count; int64_t rid; // reference ID
int32_t count; // number of references
int removed; // 1: removed
} SRefNode; } SRefNode;
typedef struct { typedef struct {
SRefNode **nodeList; SRefNode **nodeList; // array of SRefNode linked list
int state; // 0: empty, 1: active; 2: deleted int state; // 0: empty, 1: active; 2: deleted
int refId; int rsetId; // refSet ID, global unique
int max; int64_t rid; // increase by one for each new reference
int32_t count; // total number of SRefNodes in this set int max; // mod
int64_t *lockedBy; int32_t count; // total number of SRefNodes in this set
int64_t *lockedBy;
void (*fp)(void *); void (*fp)(void *);
} SRefSet; } SRefSet;
...@@ -47,54 +50,58 @@ static int tsRefSetNum = 0; ...@@ -47,54 +50,58 @@ static int tsRefSetNum = 0;
static int tsNextId = 0; static int tsNextId = 0;
static void taosInitRefModule(void); static void taosInitRefModule(void);
static int taosHashRef(SRefSet *pSet, void *p);
static void taosLockList(int64_t *lockedBy); static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy);
static void taosIncRefCount(SRefSet *pSet); static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRefCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int taosDecRefCount(int rsetId, int64_t rid, int remove);
int taosOpenRef(int max, void (*fp)(void *)) int taosOpenRef(int max, void (*fp)(void *))
{ {
SRefNode **nodeList; SRefNode **nodeList;
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;
int i, refId; int i, rsetId;
pthread_once(&tsRefModuleInit, taosInitRefModule); pthread_once(&tsRefModuleInit, taosInitRefModule);
nodeList = calloc(sizeof(SRefNode *), (size_t)max); nodeList = calloc(sizeof(SRefNode *), (size_t)max);
if (nodeList == NULL) { if (nodeList == NULL) {
return TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
} }
lockedBy = calloc(sizeof(int64_t), (size_t)max); lockedBy = calloc(sizeof(int64_t), (size_t)max);
if (lockedBy == NULL) { if (lockedBy == NULL) {
free(nodeList); free(nodeList);
return TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
} }
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
for (i = 0; i < TSDB_REF_OBJECTS; ++i) { for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
} }
if (i < TSDB_REF_OBJECTS) { if (i < TSDB_REF_OBJECTS) {
refId = tsNextId; rsetId = tsNextId;
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet);
pSet->max = max; pSet->max = max;
pSet->nodeList = nodeList; pSet->nodeList = nodeList;
pSet->lockedBy = lockedBy; pSet->lockedBy = lockedBy;
pSet->fp = fp; pSet->fp = fp;
pSet->rid = 1;
pSet->rsetId = rsetId;
pSet->state = TSDB_REF_STATE_ACTIVE; pSet->state = TSDB_REF_STATE_ACTIVE;
pSet->refId = refId; taosIncRsetCount(pSet);
tsRefSetNum++; tsRefSetNum++;
uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
} else { } else {
refId = TSDB_CODE_REF_FULL; rsetId = TSDB_CODE_REF_FULL;
free (nodeList); free (nodeList);
free (lockedBy); free (lockedBy);
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
...@@ -102,121 +109,128 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -102,121 +109,128 @@ int taosOpenRef(int max, void (*fp)(void *))
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
return refId; return rsetId;
} }
void taosCloseRef(int refId) int taosCloseRef(int rsetId)
{ {
SRefSet *pSet; SRefSet *pSet;
int deleted = 0; int deleted = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d is invalid, out of range", refId); uTrace("rsetId:%d is invalid, out of range", rsetId);
return; terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
if (pSet->state == TSDB_REF_STATE_ACTIVE) { if (pSet->state == TSDB_REF_STATE_ACTIVE) {
pSet->state = TSDB_REF_STATE_DELETED; pSet->state = TSDB_REF_STATE_DELETED;
deleted = 1; deleted = 1;
uTrace("refId:%d is closed, count:%d", refId, pSet->count); uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
} else { } else {
uTrace("refId:%d is already closed, count:%d", refId, pSet->count); uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
if (deleted) taosDecRefCount(pSet); if (deleted) taosDecRsetCount(pSet);
return 0;
} }
int taosAddRef(int refId, void *p) int64_t taosAddRef(int rsetId, void *p)
{ {
int hash; int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
int64_t rid = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p);
return TSDB_CODE_REF_INVALID_ID; terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet); taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
taosDecRefCount(pSet); taosDecRsetCount(pSet);
uTrace("refId:%d p:%p failed to add, not active", refId, p); uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
return TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return -1;
} }
int code = 0; pNode = calloc(sizeof(SRefNode), 1);
hash = taosHashRef(pSet, p); if (pNode == NULL) {
terrno = TSDB_CODE_REF_NO_MEMORY;
return -1;
}
rid = atomic_add_fetch_64(&pSet->rid, 1);
hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode->p = p;
while (pNode) { pNode->rid = rid;
if (pNode->p == p) pNode->count = 1;
break;
pNode = pNode->next; pNode->prev = NULL;
} pNode->next = pSet->nodeList[hash];
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
if (pNode) { pSet->nodeList[hash] = pNode;
code = TSDB_CODE_REF_ALREADY_EXIST;
uTrace("refId:%d p:%p is already there, faild to add", refId, p);
} else {
pNode = calloc(sizeof(SRefNode), 1);
if (pNode) {
pNode->p = p;
pNode->count = 1;
pNode->prev = 0;
pNode->next = pSet->nodeList[hash];
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
pSet->nodeList[hash] = pNode;
uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode);
} else {
code = TSDB_CODE_REF_NO_MEMORY;
uTrace("refId:%d p:%p is not added, since no memory", refId, p);
}
}
if (code < 0) taosDecRefCount(pSet); uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
return code; return rid;
}
int taosRemoveRef(int rsetId, int64_t rid)
{
return taosDecRefCount(rsetId, rid, 1);
} }
int taosAcquireRef(int refId, void *p) // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash, code = 0; int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
void *p = NULL;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
return TSDB_CODE_REF_INVALID_ID; terrno = TSDB_CODE_REF_INVALID_ID;
return NULL;
} }
pSet = tsRefSetList + refId; if (rid <= 0) {
taosIncRefCount(pSet); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return NULL;
}
pSet = tsRefSetList + rsetId;
taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to acquire, not active", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return NULL;
} }
hash = taosHashRef(pSet, p); hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->p == p) { if (pNode->rid == rid) {
break; break;
} }
...@@ -224,117 +238,76 @@ int taosAcquireRef(int refId, void *p) ...@@ -224,117 +238,76 @@ int taosAcquireRef(int refId, void *p)
} }
if (pNode) { if (pNode) {
pNode->count++; if (pNode->removed == 0) {
uTrace("refId:%d p:%p is acquired", refId, p); pNode->count++;
p = pNode->p;
uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
} else {
terrno = TSDB_CODE_REF_NOT_EXIST;
uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid);
}
} else { } else {
code = TSDB_CODE_REF_NOT_EXIST; terrno = TSDB_CODE_REF_NOT_EXIST;
uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
} }
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return code; return p;
} }
void taosReleaseRef(int refId, void *p) int taosReleaseRef(int rsetId, int64_t rid)
{ {
int hash; return taosDecRefCount(rsetId, rid, 0);
SRefNode *pNode;
SRefSet *pSet;
int released = 0;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to release, refId not valid", refId, p);
return;
}
pSet = tsRefSetList + refId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
return;
}
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p)
break;
pNode = pNode->next;
}
if (pNode) {
pNode->count--;
if (pNode->count == 0) {
if ( pNode->prev ) {
pNode->prev->next = pNode->next;
} else {
pSet->nodeList[hash] = pNode->next;
}
if ( pNode->next ) {
pNode->next->prev = pNode->prev;
}
(*pSet->fp)(pNode->p);
free(pNode);
released = 1;
uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode);
} else {
uTrace("refId:%d p:%p is released", refId, p);
}
} else {
uTrace("refId:%d p:%p is not there, failed to release", refId, p);
}
taosUnlockList(pSet->lockedBy+hash);
if (released) taosDecRefCount(pSet);
} }
// if p is NULL, return the first p in hash list, otherwise, return the next after p // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int refId, void *p) { void *taosIterateRef(int rsetId, int64_t rid) {
SRefNode *pNode = NULL; SRefNode *pNode = NULL;
SRefSet *pSet; SRefSet *pSet;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid);
terrno = TSDB_CODE_REF_INVALID_ID;
return NULL;
}
if (rid <= 0) {
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return NULL; return NULL;
} }
pSet = tsRefSetList + refId; pSet = tsRefSetList + rsetId;
taosIncRefCount(pSet); taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to iterate, not active", refId, p); uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
taosDecRefCount(pSet); terrno = TSDB_CODE_REF_ID_REMOVED;
taosDecRsetCount(pSet);
return NULL; return NULL;
} }
int hash = 0; int hash = 0;
if (p) { if (rid > 0) {
hash = taosHashRef(pSet, p); hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->p == p) break; if (pNode->rid == rid) break;
pNode = pNode->next; pNode = pNode->next;
} }
if (pNode == NULL) { if (pNode == NULL) {
uError("refId:%d p:%p not there, quit", refId, p); uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
return NULL; return NULL;
} }
// p is there // rid is there
pNode = pNode->next; pNode = pNode->next;
if (pNode == NULL) { if (pNode == NULL) {
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
...@@ -356,14 +329,14 @@ void *taosIterateRef(int refId, void *p) { ...@@ -356,14 +329,14 @@ void *taosIterateRef(int refId, void *p) {
pNode->count++; // acquire it pNode->count++; // acquire it
newP = pNode->p; newP = pNode->p;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
uTrace("refId:%d p:%p is returned", refId, p); uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
} else { } else {
uTrace("refId:%d p:%p the list is over", refId, p); uTrace("rsetId:%d the list is over", rsetId);
} }
if (p) taosReleaseRef(refId, p); // release the current one if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
taosDecRefCount(pSet); taosDecRsetCount(pSet);
return newP; return newP;
} }
...@@ -381,13 +354,13 @@ int taosListRef() { ...@@ -381,13 +354,13 @@ int taosListRef() {
if (pSet->state == TSDB_REF_STATE_EMPTY) if (pSet->state == TSDB_REF_STATE_EMPTY)
continue; continue;
uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
for (int j=0; j < pSet->max; ++j) { for (int j=0; j < pSet->max; ++j) {
pNode = pSet->nodeList[j]; pNode = pSet->nodeList[j];
while (pNode) { while (pNode) {
uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
pNode = pNode->next; pNode = pNode->next;
num++; num++;
} }
...@@ -399,21 +372,78 @@ int taosListRef() { ...@@ -399,21 +372,78 @@ int taosListRef() {
return num; return num;
} }
static int taosHashRef(SRefSet *pSet, void *p) static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
{ int hash;
int hash = 0; SRefSet *pSet;
int64_t v = (int64_t)p; SRefNode *pNode;
int released = 0;
int code = 0;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
terrno = TSDB_CODE_REF_INVALID_ID;
return -1;
}
if (rid <= 0) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return -1;
}
pSet = tsRefSetList + rsetId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid);
terrno = TSDB_CODE_REF_ID_REMOVED;
return -1;
}
for (int i = 0; i < sizeof(v); ++i) { hash = rid % pSet->max;
hash += (int)(v & 0xFFFF); taosLockList(pSet->lockedBy+hash);
v = v >> 16;
i = i + 2; pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->rid == rid)
break;
pNode = pNode->next;
} }
hash = hash % pSet->max; if (pNode) {
pNode->count--;
if (remove) pNode->removed = 1;
return hash; if (pNode->count <= 0) {
} if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pSet->nodeList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
(*pSet->fp)(pNode->p);
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
free(pNode);
released = 1;
} else {
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count);
}
} else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
code = -1;
}
taosUnlockList(pSet->lockedBy+hash);
if (released) taosDecRsetCount(pSet);
return code;
}
static void taosLockList(int64_t *lockedBy) { static void taosLockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
...@@ -436,14 +466,14 @@ static void taosInitRefModule(void) { ...@@ -436,14 +466,14 @@ static void taosInitRefModule(void) {
pthread_mutex_init(&tsRefMutex, NULL); pthread_mutex_init(&tsRefMutex, NULL);
} }
static void taosIncRefCount(SRefSet *pSet) { static void taosIncRsetCount(SRefSet *pSet) {
atomic_add_fetch_32(&pSet->count, 1); atomic_add_fetch_32(&pSet->count, 1);
uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count);
} }
static void taosDecRefCount(SRefSet *pSet) { static void taosDecRsetCount(SRefSet *pSet) {
int32_t count = atomic_sub_fetch_32(&pSet->count, 1); int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count);
if (count > 0) return; if (count > 0) return;
...@@ -458,7 +488,7 @@ static void taosDecRefCount(SRefSet *pSet) { ...@@ -458,7 +488,7 @@ static void taosDecRefCount(SRefSet *pSet) {
taosTFree(pSet->lockedBy); taosTFree(pSet->lockedBy);
tsRefSetNum--; tsRefSetNum--;
uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
......
...@@ -11,106 +11,119 @@ ...@@ -11,106 +11,119 @@
#include "tulog.h" #include "tulog.h"
typedef struct { typedef struct {
int refNum; int refNum;
int steps; int steps;
int refId; int rsetId;
void **p; int64_t rid;
void **p;
} SRefSpace; } SRefSpace;
void iterateRefs(int refId) { void iterateRefs(int rsetId) {
int count = 0; int count = 0;
void *p = taosIterateRef(refId, NULL); void *p = taosIterateRef(rsetId, NULL);
while (p) { while (p) {
// process P // process P
count++; count++;
p = taosIterateRef(refId, p); p = taosIterateRef(rsetId, p);
} }
printf(" %d ", count); printf(" %d ", count);
} }
void *takeRefActions(void *param) { void *addRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
int code, id; int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("s"); printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
code = taosAddRef(pSpace->refId, pSpace->p[id]); if (pSpace->rid[id] <= 0) {
usleep(1); pSpace->p[id] = malloc(128);
pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]);
id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
if (code >= 0) {
usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]);
} }
usleep(100);
}
return NULL;
}
void *removeRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) {
printf("d");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
taosRemoveRef(pSpace->refId, pSpace->p[id]); if (pSpace->rid[id] > 0) {
usleep(id %5 + 1); code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]);
if (code == 0) pSpace->rid[id] = 0;
}
usleep(100);
}
return NULL;
}
void *acquireRelease(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
int64_t rid;
for (int i=0; i < pSpace->steps; ++i) {
printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->refId, pSpace->p[id]); code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]);
if (code >= 0) { if (code >= 0) {
usleep(id % 5 + 1); usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]); taosReleaseRef(pSpace->rsetId, pSpace->p[id]);
} }
id = random() % pSpace->refNum;
iterateRefs(id);
} }
for (int i=0; i < pSpace->refNum; ++i) {
taosRemoveRef(pSpace->refId, pSpace->p[i]);
}
//uInfo("refId:%d thread exits", pSpace->refId);
return NULL; return NULL;
} }
void myfree(void *p) { void myfree(void *p) {
return; free(p);
} }
void *openRefSpace(void *param) { void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
printf("c"); printf("c");
pSpace->refId = taosOpenRef(50, myfree); pSpace->rsetId = taosOpenRef(50, myfree);
if (pSpace->refId < 0) { if (pSpace->rsetId < 0) {
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId));
return NULL; return NULL;
} }
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
for (int i=0; i<pSpace->refNum; ++i) {
pSpace->p[i] = (void *) malloc(128);
}
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_t thread1, thread2, thread3; pthread_t thread1, thread2, thread3;
pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace));
pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace));
pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace));
pthread_join(thread1, NULL); pthread_join(thread1, NULL);
pthread_join(thread2, NULL); pthread_join(thread2, NULL);
pthread_join(thread3, NULL); pthread_join(thread3, NULL);
taosCloseRef(pSpace->refId);
for (int i=0; i<pSpace->refNum; ++i) { for (int i=0; i<pSpace->refNum; ++i) {
free(pSpace->p[i]); taosRemoveRef(pSpace->rsetId, pSpace->rid[i]);
} }
uInfo("refId:%d main thread exit", pSpace->refId); taosCloseRef(pSpace->rsetId);
uInfo("rsetId:%d main thread exit", pSpace->rsetId);
free(pSpace->p); free(pSpace->p);
pSpace->p = NULL; pSpace->p = NULL;
...@@ -140,7 +153,7 @@ int main(int argc, char *argv[]) { ...@@ -140,7 +153,7 @@ int main(int argc, char *argv[]) {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-n]: number of references, default: %d\n", refNum); printf(" [-n]: number of references, default: %d\n", refNum);
printf(" [-s]: steps to run for each reference, default: %d\n", steps); printf(" [-s]: steps to run for each reference, default: %d\n", steps);
printf(" [-t]: number of refIds running in parallel, default: %d\n", threads); printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads);
printf(" [-l]: number of loops, default: %d\n", loops); printf(" [-l]: number of loops, default: %d\n", loops);
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag); printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
exit(0); exit(0);
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
void *rqueue; void *rqueue;
void *wal; void *wal;
void *tsdb; void *tsdb;
void *sync; int64_t sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t cfgVersion; int32_t cfgVersion;
......
...@@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds); ...@@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {} void syncStop(int64_t rid) {}
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif #endif
char* vnodeStatus[] = { char* vnodeStatus[] = {
...@@ -331,7 +331,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -331,7 +331,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
#ifndef _SYNC #ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER; pVnode->role = TAOS_SYNC_ROLE_MASTER;
#else #else
if (pVnode->sync == NULL) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno)); tstrerror(terrno));
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -561,9 +561,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -561,9 +561,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
} }
// stop replication module // stop replication module
if (pVnode->sync) { if (pVnode->sync > 0) {
void *sync = pVnode->sync; int64_t sync = pVnode->sync;
pVnode->sync = NULL; pVnode->sync = -1;
syncStop(sync); syncStop(sync);
} }
......
...@@ -43,6 +43,7 @@ extern int32_t wDebugFlag; ...@@ -43,6 +43,7 @@ extern int32_t wDebugFlag;
typedef struct { typedef struct {
uint64_t version; uint64_t version;
int64_t fileId; int64_t fileId;
int64_t rid;
int32_t vgId; int32_t vgId;
int32_t fd; int32_t fd;
int32_t keep; int32_t keep;
......
...@@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { pWal->rid = taosAddRef(tsWal.refId, pWal);
if (pWal->rid < 0) {
walFreeObj(pWal); walFreeObj(pWal);
return NULL; return NULL;
} }
...@@ -143,7 +144,7 @@ void walClose(void *handle) { ...@@ -143,7 +144,7 @@ void walClose(void *handle) {
} }
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal); taosRemoveRef(tsWal.refId, pWal->rid);
} }
static int32_t walInitObj(SWal *pWal) { static int32_t walInitObj(SWal *pWal) {
...@@ -185,7 +186,7 @@ static void walUpdateSeq() { ...@@ -185,7 +186,7 @@ static void walUpdateSeq() {
} }
static void walFsyncAll() { static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refId, NULL); SWal *pWal = taosIterateRef(tsWal.refId, 0);
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
...@@ -194,7 +195,7 @@ static void walFsyncAll() { ...@@ -194,7 +195,7 @@ static void walFsyncAll() {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refId, pWal); pWal = taosIterateRef(tsWal.refId, pWal->rid);
} }
} }
......
...@@ -161,7 +161,7 @@ python3 ./test.py -f stream/metric_1.py ...@@ -161,7 +161,7 @@ python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py python3 ./test.py -f stream/new.py
python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
python3 ./test.py -f stream/parser.py #python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py python3 ./test.py -f stream/history.py
#alter table #alter table
...@@ -207,3 +207,17 @@ python3 test.py -f tools/taosdemo.py ...@@ -207,3 +207,17 @@ python3 test.py -f tools/taosdemo.py
python3 test.py -f subscribe/singlemeter.py python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py #python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py python3 test.py -f subscribe/supertable.py
# update
python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py
python3 ./test.py -f update/append_commit_data.py
python3 ./test.py -f update/append_commit_last-0.py
python3 ./test.py -f update/append_commit_last.py
python3 ./test.py -f update/merge_commit_data.py
python3 ./test.py -f update/merge_commit_data-0.py
python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_data2_update0.py
python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py
...@@ -130,8 +130,19 @@ class TDTestCase: ...@@ -130,8 +130,19 @@ class TDTestCase:
tdSql.query("select percentile(col6, 100) from test") tdSql.query("select percentile(col6, 100) from test")
tdSql.checkData(0, 0, np.percentile(floatData, 100)) tdSql.checkData(0, 0, np.percentile(floatData, 100))
tdSql.query("select apercentile(col6, 100) from test") tdSql.query("select apercentile(col6, 100) from test")
print("apercentile result: %s" % tdSql.getData(0, 0)) print("apercentile result: %s" % tdSql.getData(0, 0))
tdSql.execute("create table meters (ts timestamp, voltage int) tags(loc nchar(20))")
tdSql.execute("create table t0 using meters tags('beijing')")
tdSql.execute("create table t1 using meters tags('shanghai')")
for i in range(self.rowNum):
tdSql.execute("insert into t0 values(%d, %d)" % (self.ts + i, i + 1))
tdSql.execute("insert into t1 values(%d, %d)" % (self.ts + i, i + 1))
tdSql.error("select percentile(voltage, 20) from meters")
tdSql.query("select apercentile(voltage, 20) from meters")
print("apercentile result: %s" % tdSql.getData(0, 0))
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
...@@ -5,7 +5,9 @@ GREEN='\033[1;32m' ...@@ -5,7 +5,9 @@ GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m' GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m' GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
nohup /root/TDinternal/debug/build/bin/taosd -c /root/TDinternal/community/sim/dnode1/cfg >/dev/null &
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
pidof taosd|xargs kill
grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log
for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'` for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'`
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
import os
import threading
import time
class TDTestCase:
"""
kill query
"""
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def query(self):
conn = taos.connect(host='127.0.0.1', user='root', password='taosdata', config='/etc/config')
cursor = conn.cursor()
while True:
cursor.execute('show queries;')
print('show queries!')
temp = cursor.fetchall()
if temp:
print(temp[0][0])
cursor.execute('kill query %s ;' % temp[0][0])
print('kill query success')
break
time.sleep(0.5)
cursor.close()
conn.close()
def run(self):
tdSql.prepare()
print("==============step1")
os.system('yes | sudo taosdemo -n 100')
print('insert into test.meters 10000000 rows')
t1 = threading.Thread(target=self.query)
t1.setDaemon(True)
t1.start()
print("==============step2")
tdSql.execute('use test;')
try:
print('============begin select * from 10000000 rows')
tdSql.query('select * from test.meters;')
# print(tdSql.queryResult)
except Exception as e:
if not "ProgrammingError('Query terminated'" in str(e):
raise Exception('fail')
print('success')
print('kill query success')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
# update
python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py
python3 ./test.py -f update/append_commit_data.py
python3 ./test.py -f update/append_commit_last-0.py
python3 ./test.py -f update/append_commit_last.py
python3 ./test.py -f update/merge_commit_data.py
python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py
\ No newline at end of file
...@@ -134,66 +134,8 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna ...@@ -134,66 +134,8 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna
#1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 | #1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 |
#1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | #1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 |
sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; sql_error select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19;
if $rows != 20 then
return -1
endi
if $data00 != @70-01-01 08:01:40.800@ then
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 45.000000000 then
return -1
endi
if $data03 != 0 then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
return -1
endi
if $data10 != @70-01-01 08:01:40.790@ then
return -1
endi
if $data11 != 10 then
return -1
endi
if $data12 != 945.000000000 then
return -1
endi
if $data13 != 90 then
return -1
endi
if $data14 != 1 then
return -1
endi
if $data15 != 1 then
return -1
endi
if $data16 != 0 then
return -1
endi
sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
if $rows != 100 then if $rows != 100 then
...@@ -261,59 +203,9 @@ if $data16 != 2 then ...@@ -261,59 +203,9 @@ if $data16 != 2 then
endi endi
# this function will cause shell crash # this function will cause shell crash
sql select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; sql_error select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
if $rows != 100 then
return -1
endi
if $data00 != @70-01-01 08:01:40.990@ then sql_error select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc;
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 90 then
return -1
endi
if $data03 != 0 then
return -1
endi
if $data11 != 10 then
return -1
endi
if $data12 != 80 then
return -1
endi
if $data13 != 0 then
return -1
endi
sql select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc;
if $rows != 1 then
return -1
endi
if $data00 != @70-01-01 08:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0 then
return -1
endi
sql_error select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); sql_error select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
sql select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); sql select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL);
......
...@@ -22,12 +22,12 @@ $i = 0 ...@@ -22,12 +22,12 @@ $i = 0
sql use $db sql use $db
sql create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s) sliding(2s) sql create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s)
sql create table memstrm as select count(*), avg(mem_taosd), max(mem_taosd), min(mem_taosd), avg(mem_system), first(mem_total), last(mem_total) from log.dn1 interval(4s) sliding(2s) sql create table memstrm as select count(*), avg(mem_taosd), max(mem_taosd), min(mem_taosd), avg(mem_system), first(mem_total), last(mem_total) from log.dn1 interval(4s)
sql create table diskstrm as select count(*), avg(disk_used), last(disk_used), avg(disk_total), first(disk_total) from log.dn1 interval(4s) sliding(2s) sql create table diskstrm as select count(*), avg(disk_used), last(disk_used), avg(disk_total), first(disk_total) from log.dn1 interval(4s)
sql create table bandstrm as select count(*), avg(band_speed), last(band_speed) from log.dn1 interval(4s) sliding(2s) sql create table bandstrm as select count(*), avg(band_speed), last(band_speed) from log.dn1 interval(4s)
sql create table reqstrm as select count(*), avg(req_http), last(req_http), avg(req_select), last(req_select), avg(req_insert), last(req_insert) from log.dn1 interval(4s) sliding(2s) sql create table reqstrm as select count(*), avg(req_http), last(req_http), avg(req_select), last(req_select), avg(req_insert), last(req_insert) from log.dn1 interval(4s)
sql create table iostrm as select count(*), avg(io_read), last(io_read), avg(io_write), last(io_write) from log.dn1 interval(4s) sliding(2s) sql create table iostrm as select count(*), avg(io_read), last(io_read), avg(io_write), last(io_write) from log.dn1 interval(4s)
sleep 120000 sleep 120000
sql select * from cpustrm sql select * from cpustrm
if $rows <= 0 then if $rows <= 0 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册