diff --git a/Jenkinsfile b/Jenkinsfile index 834321240fa631d9ed2ecc5e68b5b492a29d41d0..dc7836c3daacaa457f721f9278687b99770fc394 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -90,7 +90,6 @@ pipeline { catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { sh ''' cd ${WKC}/tests/pytest - ./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4 ./handle_crash_gen_val_log.sh ''' } diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1c2821638bd24d67c51985a9e67da26e83e768b8..4c85af4919c74de5f41272272bd1bdbf4f6c6da3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -330,6 +330,7 @@ typedef struct STscObj { char writeAuth : 1; char superAuth : 1; uint32_t connId; + uint64_t rid; // ref ID returned by taosAddRef struct SSqlObj * pHb; struct SSqlObj * sqlList; struct SSqlStream *streamList; @@ -348,7 +349,7 @@ typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed STscObj *pTscObj; - void *pRpcCtx; + int64_t rpcRid; void (*fp)(); void (*fetchFp)(); void *param; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b7d3a22596154e847924d2ee7be683bee1031cfe..253563448042cd0f9153389e9b3f65c715ddcaae 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); - taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer); } else { tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); } } void tscProcessActivityTimer(void *handle, void *tmrId) { - STscObj *pObj = (STscObj *)handle; - - int ret = taosAcquireRef(tscRefId, pObj); - if (ret < 0) { - tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret)); - return; - } + int64_t rid = (int64_t) handle; + STscObj *pObj = taosAcquireRef(tscRefId, rid); + if (pObj == NULL) return; SSqlObj* pHB = pObj->pHb; void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); if (p == NULL) { tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, pObj->rid); return; } @@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, rid); } int tscSendMsgToServer(SSqlObj *pSql) { @@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .pCont = pMsg, .contLen = pSql->cmd.payloadLen, .ahandle = pSql, - .handle = &pSql->pRpcCtx, + .handle = NULL, .code = 0 }; @@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); - pSql->pRpcCtx = NULL; + pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { createHBObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 89dfa24e8fe2cef32673fb2016fa538f218d1bcf..bdc46c544670e790cc15b308b24e795f69f24e04 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa registerSqlObj(pSql); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); - taosAddRef(tscRefId, pObj); + pObj->rid = taosAddRef(tscRefId, pObj); return pSql; } @@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { SSqlObj* pHb = pObj->pHb; if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { - if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode - rpcCancelRequest(pHb->pRpcCtx); - pHb->pRpcCtx = NULL; + if (pHb->rpcRid > 0) { // wait for rsp from dnode + rpcCancelRequest(pHb->rpcRid); + pHb->rpcRid = -1; } tscDebug("%p HB is freed", pHb); @@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); - taosRemoveRef(tscRefId, pObj); + taosRemoveRef(tscRefId, pObj->rid); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { @@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { assert(pSubObj->self == (SSqlObj**) p); pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSubObj->pRpcCtx != NULL) { - rpcCancelRequest(pSubObj->pRpcCtx); - pSubObj->pRpcCtx = NULL; + if (pSubObj->rpcRid > 0) { + rpcCancelRequest(pSubObj->rpcRid); + pSubObj->rpcRid = -1; } tscQueueAsyncRes(pSubObj); @@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - assert(pSql->pRpcCtx == NULL); + assert(pSql->rpcRid <= 0); tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { @@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { * reset and freed in the processMsgFromServer function, and causes the invalid * write problem for rpcCancelRequest. */ - if (pSql->pRpcCtx != NULL) { - rpcCancelRequest(pSql->pRpcCtx); - pSql->pRpcCtx = NULL; + if (pSql->rpcRid > 0) { + rpcCancelRequest(pSql->rpcRid); + pSql->rpcRid = -1; } tscQueueAsyncRes(pSql); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 839d5889f3beb5401ba91790fed1c4a925c600bc..01046a2840aab2e8cdfb27682778f572d3179dfb 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -36,7 +36,7 @@ void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; -int tscRefId; +int tscRefId = -1; int tscNumOfThreads; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 879eeeadedf03afc2a6dbd240c5500c801efc483..2aee90653dfb790e3aae23c3804975af97a65311 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); if (ref == 0) { tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); - taosRemoveRef(tscRefId, pTscObj); + taosRemoveRef(tscRefId, pTscObj->rid); } } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index bdee917b5e8203743a79ca27d8fbc987569c35ab..e430a43807bc6fb0dd5c48d8a912223c1ba015af 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -83,13 +83,13 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); +int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(void *pContext); +void rpcCancelRequest(int64_t rid); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index ca11e0ae0bbfe56c2aad7fb91e280ba056f4209a..0861c94a76e90dc4e74f0f75e31b3d1fc524664b 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -106,13 +106,13 @@ typedef void* tsync_h; int32_t syncInit(); void syncCleanUp(); -tsync_h syncStart(const SSyncInfo *); -void syncStop(tsync_h shandle); -int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); -void syncRecover(tsync_h shandle); // recover from other nodes: -int syncGetNodesRole(tsync_h shandle, SNodesRole *); +int64_t syncStart(const SSyncInfo *); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg *); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); +void syncRecover(int64_t rid); // recover from other nodes: +int syncGetNodesRole(int64_t rid, SNodesRole *); extern char *syncRole[]; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 4e370ca0280d6aeb17023794a8b23fa2cc20742d..ae359d456d567c223590f9f5d51c6c3312b01da7 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,7 +72,7 @@ typedef struct { ESyncRole role; ESdbStatus status; int64_t version; - void * sync; + int64_t sync; void * wal; SSyncCfg cfg; int32_t numOfTables; @@ -212,7 +212,7 @@ static void sdbRestoreTables() { } void sdbUpdateMnodeRoles() { - if (tsSdbObj.sync == NULL) return; + if (tsSdbObj.sync <= 0) return; SNodesRole roles = {0}; syncGetNodesRole(tsSdbObj.sync, &roles); @@ -433,7 +433,7 @@ void sdbCleanUp() { if (tsSdbObj.sync) { syncStop(tsSdbObj.sync); - tsSdbObj.sync = NULL; + tsSdbObj.sync = -1; } if (tsSdbObj.wal) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d89d3e275d6a2fc34bcfe39655b0552376a0b952..05275c28b0f7c217b630d3c80e13c3707f8b5926 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,6 +82,7 @@ typedef struct { int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API @@ -220,8 +221,7 @@ static void rpcFree(void *p) { free(p); } -static void rpcInit(void) { - +void rpcInit(void) { tsProgressTimer = tsRpcTimer/2; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcHeadSize = RPC_MSG_OVERHEAD; @@ -230,6 +230,11 @@ static void rpcInit(void) { tsRpcRefId = taosOpenRef(200, rpcFree); } +void rpcCleanup(void) { + taosCloseRef(tsRpcRefId); + tsRpcRefId = -1; +} + void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; @@ -374,7 +379,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { +int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -403,10 +408,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { // set the handle to pContext, so app can cancel the request if (pMsg->handle) *((void **)pMsg->handle) = pContext; - taosAddRef(tsRpcRefId, pContext); + pContext->rid = taosAddRef(tsRpcRefId, pContext); + rpcSendReqToServer(pRpc, pContext); - return; + return pContext->rid; } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -551,15 +557,14 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { return code; } -void rpcCancelRequest(void *handle) { - SRpcReqContext *pContext = handle; +void rpcCancelRequest(int64_t rid) { - int code = taosAcquireRef(tsRpcRefId, pContext); - if (code < 0) return; + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); + if (pContext == NULL) return; rpcCloseConn(pContext->pConn); - taosReleaseRef(tsRpcRefId, pContext); + taosReleaseRef(tsRpcRefId, rid); } static void rpcFreeMsg(void *msg) { @@ -628,7 +633,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { if (pConn->pContext) pConn->pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pConn->pContext); + taosRemoveRef(tsRpcRefId, pConn->pContext->rid); } } @@ -1109,7 +1114,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { } // free the request message - taosRemoveRef(tsRpcRefId, pContext); + taosRemoveRef(tsRpcRefId, pContext->rid); } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { @@ -1620,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc) tDebug("%s rpc resources are released", pRpc->label); taosTFree(pRpc); - int count = atomic_sub_fetch_32(&tsRpcNum, 1); - if (count == 0) { - // taosCloseRef(tsRpcRefId); - // tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error - } + atomic_sub_fetch_32(&tsRpcNum, 1); } } diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index f6818106462cb9c9d694d59c7cb9012cd1a54c8b..8808a82c463cd9023395881871b99d531a93fc97 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -141,6 +141,7 @@ typedef struct SSyncNode { int8_t replica; int8_t quorum; uint32_t vgId; + int64_t rid; void *ahandle; int8_t selfIndex; SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index a504ded657e23210d0dc066158d8801f07a8567e..eee8af7bcd1cedc601ab823de5b542b7edcdb051 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -142,14 +142,14 @@ void syncCleanUp() { sInfo("sync module is cleaned up"); } -void *syncStart(const SSyncInfo *pInfo) { +int64_t syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); if (pNode == NULL) { sError("no memory to allocate syncNode"); terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; + return -1; } tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); @@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; - int ret = taosAddRef(tsSyncRefId, pNode); - if (ret < 0) { + pNode->rid = taosAddRef(tsSyncRefId, pNode); + if (pNode->rid < 0) { syncFreeNode(pNode); - return NULL; + return -1; } for (int i = 0; i < pCfg->replica; ++i) { @@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } nodeVersion = pInfo->version; // set the initial version @@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); if (pNode->pFwdTimer == NULL) { sError("vgId:%d, failed to allocate timer", pNode->vgId); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } syncAddArbitrator(pNode); @@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - return pNode; + return pNode->rid; } -void syncStop(void *param) { - SSyncNode *pNode = param; +void syncStop(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -245,16 +244,15 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); - taosRemoveRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); + taosRemoveRef(tsSyncRefId, rid); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { - SSyncNode *pNode = param; +int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { int i, j; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return 0; +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return 0; int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; if (pPeer && pNode->quorum > 1) { @@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { } } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -void syncRecover(void *param) { - SSyncNode *pNode = param; +void syncRecover(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -393,14 +386,12 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return -1; +int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return -1; pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { @@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { pNodesRole->role[i] = pNode->peerInfo[i]->role; } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } @@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - taosAcquireRef(tsSyncRefId, pNode); + taosAcquireRef(tsSyncRefId, pNode->rid); return pPeer; } @@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; + if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, pNode->rid); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } static void syncMonitorFwdInfos(void *param, void *tmrId) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if ( ret < 0) return; + int64_t rid = (int64_t) param; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncFwds *pSyncFwds = pNode->pSyncFwds; @@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 0cf752da97848056a7e5e3892025e87c3aef4c5d..9dd3feb4614105c13ba6a2d1069931345167b472 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -30,7 +30,7 @@ int dataFd = -1; void * qhandle = NULL; int walNum = 0; uint64_t tversion = 0; -void * syncHandle; +int64_t syncHandle; int role; int nodeId; char path[256]; diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h new file mode 100644 index 0000000000000000000000000000000000000000..10b7c1df35caaa675c9334106785b91af3fdeb05 --- /dev/null +++ b/src/util/inc/tfile.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TFILE_H +#define TDENGINE_TFILE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +// init taos file module +int32_t tfinit(); + +// clean up taos file module +void tfcleanup(); + +// the same syntax as UNIX standard open/close/read/write +// but FD is int64_t and will never be reused +int64_t tfopen(const char *pathname, int32_t flags); +int64_t tfclose(int64_t tfd); +int64_t tfwrite(int64_t tfd, void *buf, int64_t count); +int64_t tfread(int64_t tfd, void *buf, int64_t count); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TREF_H diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index ead8e2eb90fa0e6e2447a8c1d7da35b6e9747877..cd5092f30a290c51de49e38b0226bbed637dc0e6 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -21,38 +21,48 @@ extern "C" { #endif -// open an instance, return refId which will be used by other APIs -int taosOpenRef(int max, void (*fp)(void *)); +// open a reference set, max is the mod used by hash, fp is the pointer to free resource function +// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately +int taosOpenRef(int max, void (*fp)(void *)); -// close the Ref instance -void taosCloseRef(int refId); +// close the reference set, refId is the return value by taosOpenRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosCloseRef(int refId); // add ref, p is the pointer to resource or pointer ID -int taosAddRef(int refId, void *p); -#define taosRemoveRef taosReleaseRef +// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately +int64_t taosAddRef(int refId, void *p); -// acquire ref, p is the pointer to resource or pointer ID -int taosAcquireRef(int refId, void *p); +// remove ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosRemoveRef(int rsetId, int64_t rid); -// release ref, p is the pointer to resource or pinter ID -void taosReleaseRef(int refId, void *p); +// acquire ref, rid is the reference ID returned by taosAddRef +// return the resource p. On error, NULL is returned, and terrno is set appropriately +void *taosAcquireRef(int rsetId, int64_t rid); -// return the first if p is null, otherwise return the next after p -void *taosIterateRef(int refId, void *p); +// release ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosReleaseRef(int rsetId, int64_t rid); + +// return the first reference if rid is 0, otherwise return the next after current reference. +// if return value is NULL, it means list is over(if terrno is set, it means error happens) +void *taosIterateRef(int rsetId, int64_t rid); // return the number of references in system int taosListRef(); /* sample code to iterate the refs -void demoIterateRefs(int refId) { +void demoIterateRefs(int rsetId) { - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(refId, 0); while (p) { - // process P + + // get the rid from p - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, rid); } } diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c new file mode 100644 index 0000000000000000000000000000000000000000..27ba30fe8179ecd5e213d4cf78862fd4982b3ba4 --- /dev/null +++ b/src/util/src/tfile.c @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taoserror.h" +#include "tulog.h" +#include "tutil.h" +#include "tref.h" + +static int32_t tsFileRsetId = -1; + +static void taosCloseFile(void *p) { + close((int32_t)(uintptr_t)p); +} + +int32_t tfinit() { + tsFileRsetId = taosOpenRef(2000, taosCloseFile); + return tsFileRsetId; +} + +void tfcleanup() { + if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); + tsFileRsetId = -1; +} + +int64_t tfopen(const char *pathname, int32_t flags) { + int32_t fd = open(pathname, flags); + + if (fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + void *p = (void *)(int64_t)fd; + int64_t rid = taosAddRef(tsFileRsetId, p); + if (rid < 0) close(fd); + + return rid; +} + +int64_t tfclose(int64_t tfd) { + return taosRemoveRef(tsFileRsetId, tfd); +} + +int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + + int64_t ret = taosWrite(fd, buf, count); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + +int64_t tfread(int64_t tfd, void *buf, int64_t count) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + + int64_t ret = taosRead(fd, buf, count); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 23a7210e99105a1e0e9919939da2d6d186e09e26..b998dfd43c72f2b2248623bd67cab6ad08de9ade 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -24,19 +24,22 @@ #define TSDB_REF_STATE_DELETED 2 typedef struct SRefNode { - struct SRefNode *prev; - struct SRefNode *next; - void *p; - int32_t count; + struct SRefNode *prev; // previous node + struct SRefNode *next; // next node + void *p; // pointer to resource protected, + int64_t rid; // reference ID + int32_t count; // number of references + int removed; // 1: removed } SRefNode; typedef struct { - SRefNode **nodeList; - int state; // 0: empty, 1: active; 2: deleted - int refId; - int max; - int32_t count; // total number of SRefNodes in this set - int64_t *lockedBy; + SRefNode **nodeList; // array of SRefNode linked list + int state; // 0: empty, 1: active; 2: deleted + int rsetId; // refSet ID, global unique + int64_t rid; // increase by one for each new reference + int max; // mod + int32_t count; // total number of SRefNodes in this set + int64_t *lockedBy; void (*fp)(void *); } SRefSet; @@ -47,54 +50,58 @@ static int tsRefSetNum = 0; static int tsNextId = 0; static void taosInitRefModule(void); -static int taosHashRef(SRefSet *pSet, void *p); static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); -static void taosIncRefCount(SRefSet *pSet); -static void taosDecRefCount(SRefSet *pSet); +static void taosIncRsetCount(SRefSet *pSet); +static void taosDecRsetCount(SRefSet *pSet); +static int taosDecRefCount(int rsetId, int64_t rid, int remove); int taosOpenRef(int max, void (*fp)(void *)) { SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy; - int i, refId; + int i, rsetId; pthread_once(&tsRefModuleInit, taosInitRefModule); nodeList = calloc(sizeof(SRefNode *), (size_t)max); - if (nodeList == NULL) { - return TSDB_CODE_REF_NO_MEMORY; + if (nodeList == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } lockedBy = calloc(sizeof(int64_t), (size_t)max); if (lockedBy == NULL) { free(nodeList); - return TSDB_CODE_REF_NO_MEMORY; + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } pthread_mutex_lock(&tsRefMutex); for (i = 0; i < TSDB_REF_OBJECTS; ++i) { tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; + if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; } if (i < TSDB_REF_OBJECTS) { - refId = tsNextId; - pSet = tsRefSetList + refId; - taosIncRefCount(pSet); + rsetId = tsNextId; + pSet = tsRefSetList + rsetId; pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; pSet->fp = fp; + pSet->rid = 1; + pSet->rsetId = rsetId; pSet->state = TSDB_REF_STATE_ACTIVE; - pSet->refId = refId; + taosIncRsetCount(pSet); tsRefSetNum++; - uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); + uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum); } else { - refId = TSDB_CODE_REF_FULL; + rsetId = TSDB_CODE_REF_FULL; free (nodeList); free (lockedBy); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); @@ -102,121 +109,128 @@ int taosOpenRef(int max, void (*fp)(void *)) pthread_mutex_unlock(&tsRefMutex); - return refId; + return rsetId; } -void taosCloseRef(int refId) +int taosCloseRef(int rsetId) { SRefSet *pSet; int deleted = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d is invalid, out of range", refId); - return; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d is invalid, out of range", rsetId); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; pthread_mutex_lock(&tsRefMutex); if (pSet->state == TSDB_REF_STATE_ACTIVE) { pSet->state = TSDB_REF_STATE_DELETED; deleted = 1; - uTrace("refId:%d is closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count); } else { - uTrace("refId:%d is already closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count); } pthread_mutex_unlock(&tsRefMutex); - if (deleted) taosDecRefCount(pSet); + if (deleted) taosDecRsetCount(pSet); + + return 0; } -int taosAddRef(int refId, void *p) +int64_t taosAddRef(int rsetId, void *p) { int hash; SRefNode *pNode; SRefSet *pSet; + int64_t rid = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; - taosIncRefCount(pSet); + pSet = tsRefSetList + rsetId; + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - taosDecRefCount(pSet); - uTrace("refId:%d p:%p failed to add, not active", refId, p); - return TSDB_CODE_REF_ID_REMOVED; + taosDecRsetCount(pSet); + uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; } - int code = 0; - hash = taosHashRef(pSet, p); + pNode = calloc(sizeof(SRefNode), 1); + if (pNode == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; + } + rid = atomic_add_fetch_64(&pSet->rid, 1); + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->p == p) - break; + pNode->p = p; + pNode->rid = rid; + pNode->count = 1; - pNode = pNode->next; - } - - if (pNode) { - code = TSDB_CODE_REF_ALREADY_EXIST; - uTrace("refId:%d p:%p is already there, faild to add", refId, p); - } else { - pNode = calloc(sizeof(SRefNode), 1); - if (pNode) { - pNode->p = p; - pNode->count = 1; - pNode->prev = 0; - pNode->next = pSet->nodeList[hash]; - if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; - pSet->nodeList[hash] = pNode; - uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode); - } else { - code = TSDB_CODE_REF_NO_MEMORY; - uTrace("refId:%d p:%p is not added, since no memory", refId, p); - } - } + pNode->prev = NULL; + pNode->next = pSet->nodeList[hash]; + if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; + pSet->nodeList[hash] = pNode; - if (code < 0) taosDecRefCount(pSet); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count); taosUnlockList(pSet->lockedBy+hash); - return code; + return rid; +} + +int taosRemoveRef(int rsetId, int64_t rid) +{ + return taosDecRefCount(rsetId, rid, 1); } -int taosAcquireRef(int refId, void *p) +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid +void *taosAcquireRef(int rsetId, int64_t rid) { - int hash, code = 0; + int hash; SRefNode *pNode; SRefSet *pSet; + void *p = NULL; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return NULL; } - pSet = tsRefSetList + refId; - taosIncRefCount(pSet); + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return NULL; + } + + pSet = tsRefSetList + rsetId; + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to acquire, not active", refId, p); - taosDecRefCount(pSet); - return TSDB_CODE_REF_ID_REMOVED; + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid); + taosDecRsetCount(pSet); + terrno = TSDB_CODE_REF_ID_REMOVED; + return NULL; } - hash = taosHashRef(pSet, p); - + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) { + if (pNode->rid == rid) { break; } @@ -224,117 +238,76 @@ int taosAcquireRef(int refId, void *p) } if (pNode) { - pNode->count++; - uTrace("refId:%d p:%p is acquired", refId, p); + if (pNode->removed == 0) { + pNode->count++; + p = pNode->p; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); + } else { + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid); + } } else { - code = TSDB_CODE_REF_NOT_EXIST; - uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); } taosUnlockList(pSet->lockedBy+hash); - taosDecRefCount(pSet); + taosDecRsetCount(pSet); - return code; + return p; } -void taosReleaseRef(int refId, void *p) +int taosReleaseRef(int rsetId, int64_t rid) { - int hash; - SRefNode *pNode; - SRefSet *pSet; - int released = 0; - - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to release, refId not valid", refId, p); - return; - } - - pSet = tsRefSetList + refId; - if (pSet->state == TSDB_REF_STATE_EMPTY) { - uTrace("refId:%d p:%p failed to release, cleaned", refId, p); - return; - } - - hash = taosHashRef(pSet, p); - - taosLockList(pSet->lockedBy+hash); - - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->p == p) - break; - - pNode = pNode->next; - } - - if (pNode) { - pNode->count--; - - if (pNode->count == 0) { - if ( pNode->prev ) { - pNode->prev->next = pNode->next; - } else { - pSet->nodeList[hash] = pNode->next; - } - - if ( pNode->next ) { - pNode->next->prev = pNode->prev; - } - - (*pSet->fp)(pNode->p); - - free(pNode); - released = 1; - uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode); - } else { - uTrace("refId:%d p:%p is released", refId, p); - } - } else { - uTrace("refId:%d p:%p is not there, failed to release", refId, p); - } - - taosUnlockList(pSet->lockedBy+hash); - - if (released) taosDecRefCount(pSet); + return taosDecRefCount(rsetId, rid, 0); } -// if p is NULL, return the first p in hash list, otherwise, return the next after p -void *taosIterateRef(int refId, void *p) { +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid +void *taosIterateRef(int rsetId, int64_t rid) { SRefNode *pNode = NULL; SRefSet *pSet; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return NULL; + } + + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; return NULL; } - pSet = tsRefSetList + refId; - taosIncRefCount(pSet); + pSet = tsRefSetList + rsetId; + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to iterate, not active", refId, p); - taosDecRefCount(pSet); + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + taosDecRsetCount(pSet); return NULL; } int hash = 0; - if (p) { - hash = taosHashRef(pSet, p); + if (rid > 0) { + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) break; + if (pNode->rid == rid) break; pNode = pNode->next; } if (pNode == NULL) { - uError("refId:%d p:%p not there, quit", refId, p); + uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; taosUnlockList(pSet->lockedBy+hash); return NULL; } - // p is there + // rid is there pNode = pNode->next; if (pNode == NULL) { taosUnlockList(pSet->lockedBy+hash); @@ -356,14 +329,14 @@ void *taosIterateRef(int refId, void *p) { pNode->count++; // acquire it newP = pNode->p; taosUnlockList(pSet->lockedBy+hash); - uTrace("refId:%d p:%p is returned", refId, p); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); } else { - uTrace("refId:%d p:%p the list is over", refId, p); + uTrace("rsetId:%d the list is over", rsetId); } - if (p) taosReleaseRef(refId, p); // release the current one + if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return newP; } @@ -381,13 +354,13 @@ int taosListRef() { if (pSet->state == TSDB_REF_STATE_EMPTY) continue; - uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); + uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); for (int j=0; j < pSet->max; ++j) { pNode = pSet->nodeList[j]; while (pNode) { - uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); + uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count); pNode = pNode->next; num++; } @@ -399,21 +372,78 @@ int taosListRef() { return num; } -static int taosHashRef(SRefSet *pSet, void *p) -{ - int hash = 0; - int64_t v = (int64_t)p; +static int taosDecRefCount(int rsetId, int64_t rid, int remove) { + int hash; + SRefSet *pSet; + SRefNode *pNode; + int released = 0; + int code = 0; + + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; + } + + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return -1; + } + + pSet = tsRefSetList + rsetId; + if (pSet->state == TSDB_REF_STATE_EMPTY) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; + } - for (int i = 0; i < sizeof(v); ++i) { - hash += (int)(v & 0xFFFF); - v = v >> 16; - i = i + 2; + hash = rid % pSet->max; + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->rid == rid) + break; + + pNode = pNode->next; } - hash = hash % pSet->max; + if (pNode) { + pNode->count--; + if (remove) pNode->removed = 1; - return hash; -} + if (pNode->count <= 0) { + if (pNode->prev) { + pNode->prev->next = pNode->next; + } else { + pSet->nodeList[hash] = pNode->next; + } + + if (pNode->next) { + pNode->next->prev = pNode->prev; + } + + (*pSet->fp)(pNode->p); + + uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); + free(pNode); + released = 1; + } else { + uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count); + } + } else { + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + code = -1; + } + + taosUnlockList(pSet->lockedBy+hash); + + if (released) taosDecRsetCount(pSet); + + return code; +} static void taosLockList(int64_t *lockedBy) { int64_t tid = taosGetPthreadId(); @@ -436,14 +466,14 @@ static void taosInitRefModule(void) { pthread_mutex_init(&tsRefMutex, NULL); } -static void taosIncRefCount(SRefSet *pSet) { +static void taosIncRsetCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); - uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); + // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count); } -static void taosDecRefCount(SRefSet *pSet) { +static void taosDecRsetCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); - uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); + // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count); if (count > 0) return; @@ -458,7 +488,7 @@ static void taosDecRefCount(SRefSet *pSet) { taosTFree(pSet->lockedBy); tsRefSetNum--; - uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); + uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count); } pthread_mutex_unlock(&tsRefMutex); diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 09ffccd7b5fc4ea1c2a645a8ff6d8ec0c70cc8f2..6887b24abdae9f5d9949fee0073a35eb717eb4f7 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -11,106 +11,119 @@ #include "tulog.h" typedef struct { - int refNum; - int steps; - int refId; - void **p; + int refNum; + int steps; + int rsetId; + int64_t rid; + void **p; } SRefSpace; -void iterateRefs(int refId) { +void iterateRefs(int rsetId) { int count = 0; - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(rsetId, NULL); while (p) { // process P count++; - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, p); } printf(" %d ", count); } -void *takeRefActions(void *param) { +void *addRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; - int code, id; + int id; + int64_t rid; for (int i=0; i < pSpace->steps; ++i) { - printf("s"); + printf("a"); id = random() % pSpace->refNum; - code = taosAddRef(pSpace->refId, pSpace->p[id]); - usleep(1); - - id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); - if (code >= 0) { - usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); + if (pSpace->rid[id] <= 0) { + pSpace->p[id] = malloc(128); + pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]); } + usleep(100); + } + return NULL; +} + +void *removeRef(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("d"); id = random() % pSpace->refNum; - taosRemoveRef(pSpace->refId, pSpace->p[id]); - usleep(id %5 + 1); + if (pSpace->rid[id] > 0) { + code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]); + if (code == 0) pSpace->rid[id] = 0; + } + usleep(100); + } + + return NULL; +} + +void *acquireRelease(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("a"); + id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); + code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); if (code >= 0) { usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); + taosReleaseRef(pSpace->rsetId, pSpace->p[id]); } - - id = random() % pSpace->refNum; - iterateRefs(id); } - for (int i=0; i < pSpace->refNum; ++i) { - taosRemoveRef(pSpace->refId, pSpace->p[i]); - } - - //uInfo("refId:%d thread exits", pSpace->refId); - return NULL; } void myfree(void *p) { - return; + free(p); } void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; printf("c"); - pSpace->refId = taosOpenRef(50, myfree); + pSpace->rsetId = taosOpenRef(50, myfree); - if (pSpace->refId < 0) { - printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); + if (pSpace->rsetId < 0) { + printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId)); return NULL; } pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); - for (int i=0; irefNum; ++i) { - pSpace->p[i] = (void *) malloc(128); - } pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_t thread1, thread2, thread3; - pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); + pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace)); + pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace)); + pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace)); pthread_join(thread1, NULL); pthread_join(thread2, NULL); pthread_join(thread3, NULL); - taosCloseRef(pSpace->refId); - for (int i=0; irefNum; ++i) { - free(pSpace->p[i]); + taosRemoveRef(pSpace->rsetId, pSpace->rid[i]); } - uInfo("refId:%d main thread exit", pSpace->refId); + taosCloseRef(pSpace->rsetId); + + uInfo("rsetId:%d main thread exit", pSpace->rsetId); free(pSpace->p); pSpace->p = NULL; @@ -140,7 +153,7 @@ int main(int argc, char *argv[]) { printf("\nusage: %s [options] \n", argv[0]); printf(" [-n]: number of references, default: %d\n", refNum); printf(" [-s]: steps to run for each reference, default: %d\n", steps); - printf(" [-t]: number of refIds running in parallel, default: %d\n", threads); + printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads); printf(" [-l]: number of loops, default: %d\n", loops); printf(" [-d]: debugFlag, default: %d\n", uDebugFlag); exit(0); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 935da5436778c3aea2beb71acd75e11fe4965dcb..89ec277006542669961bf40549a2d53eb68ca5f8 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -47,7 +47,7 @@ typedef struct { void *rqueue; void *wal; void *tsdb; - void *sync; + int64_t sync; void *events; void *cq; // continuous query int32_t cfgVersion; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7d577556b91c56f2ef59ffbb1c667c8535ffc9ae..a7dff5aef54ddf588efe9554851fa6cc424e0724 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC -tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } -void syncStop(tsync_h shandle) {} -int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } -int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} +int64_t syncStart(const SSyncInfo *info) { return NULL; } +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } +void syncStop(int64_t rid) {} +int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } +int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif char* vnodeStatus[] = { @@ -331,7 +331,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; #else - if (pVnode->sync == NULL) { + if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); vnodeCleanUp(pVnode); @@ -561,9 +561,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // stop replication module - if (pVnode->sync) { - void *sync = pVnode->sync; - pVnode->sync = NULL; + if (pVnode->sync > 0) { + int64_t sync = pVnode->sync; + pVnode->sync = -1; syncStop(sync); } diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 5273eb5b1c2c56d300243f8bbd724a39031aa164..d1e977225929d8e34b42fbe6aec53c3f41bb7273 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -43,6 +43,7 @@ extern int32_t wDebugFlag; typedef struct { uint64_t version; int64_t fileId; + int64_t rid; int32_t vgId; int32_t fd; int32_t keep; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 8fc7bfeaf3abbc8362a64b86153f7252626b2bd3..2ae342244d95437b82c7131e9a9ef7f2344dab36 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { + pWal->rid = taosAddRef(tsWal.refId, pWal); + if (pWal->rid < 0) { walFreeObj(pWal); return NULL; } @@ -143,7 +144,7 @@ void walClose(void *handle) { } pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refId, pWal); + taosRemoveRef(tsWal.refId, pWal->rid); } static int32_t walInitObj(SWal *pWal) { @@ -185,7 +186,7 @@ static void walUpdateSeq() { } static void walFsyncAll() { - SWal *pWal = taosIterateRef(tsWal.refId, NULL); + SWal *pWal = taosIterateRef(tsWal.refId, 0); while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); @@ -194,7 +195,7 @@ static void walFsyncAll() { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } } - pWal = taosIterateRef(tsWal.refId, pWal); + pWal = taosIterateRef(tsWal.refId, pWal->rid); } } diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index b14321a4ef88a43f022fb3929f8a0e06edcad4f7..b555a27c9377bd45d8a6985309f423d3dbf64157 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -161,7 +161,7 @@ python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/new.py python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream2.py -python3 ./test.py -f stream/parser.py +#python3 ./test.py -f stream/parser.py python3 ./test.py -f stream/history.py #alter table @@ -207,3 +207,17 @@ python3 test.py -f tools/taosdemo.py python3 test.py -f subscribe/singlemeter.py #python3 test.py -f subscribe/stability.py python3 test.py -f subscribe/supertable.py + + +# update +python3 ./test.py -f update/allow_update.py +python3 ./test.py -f update/allow_update-0.py +python3 ./test.py -f update/append_commit_data.py +python3 ./test.py -f update/append_commit_last-0.py +python3 ./test.py -f update/append_commit_last.py +python3 ./test.py -f update/merge_commit_data.py +python3 ./test.py -f update/merge_commit_data-0.py +python3 ./test.py -f update/merge_commit_data2.py +python3 ./test.py -f update/merge_commit_data2_update0.py +python3 ./test.py -f update/merge_commit_last-0.py +python3 ./test.py -f update/merge_commit_last.py diff --git a/tests/pytest/functions/function_percentile.py b/tests/pytest/functions/function_percentile.py index aaeb94372e44282da0d83849c18e40857463b42c..e63d65f2e6a429015e2b4d7dcbe5e8c9884eea5e 100644 --- a/tests/pytest/functions/function_percentile.py +++ b/tests/pytest/functions/function_percentile.py @@ -130,8 +130,19 @@ class TDTestCase: tdSql.query("select percentile(col6, 100) from test") tdSql.checkData(0, 0, np.percentile(floatData, 100)) tdSql.query("select apercentile(col6, 100) from test") - print("apercentile result: %s" % tdSql.getData(0, 0)) + print("apercentile result: %s" % tdSql.getData(0, 0)) + tdSql.execute("create table meters (ts timestamp, voltage int) tags(loc nchar(20))") + tdSql.execute("create table t0 using meters tags('beijing')") + tdSql.execute("create table t1 using meters tags('shanghai')") + for i in range(self.rowNum): + tdSql.execute("insert into t0 values(%d, %d)" % (self.ts + i, i + 1)) + tdSql.execute("insert into t1 values(%d, %d)" % (self.ts + i, i + 1)) + + tdSql.error("select percentile(voltage, 20) from meters") + tdSql.query("select apercentile(voltage, 20) from meters") + print("apercentile result: %s" % tdSql.getData(0, 0)) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/handle_crash_gen_val_log.sh b/tests/pytest/handle_crash_gen_val_log.sh index 1a4c12a16c7d9bc4e8b2dd327765251745d44223..2d48de65c9a16fdb6f5f906167baf772694e9a05 100755 --- a/tests/pytest/handle_crash_gen_val_log.sh +++ b/tests/pytest/handle_crash_gen_val_log.sh @@ -5,7 +5,9 @@ GREEN='\033[1;32m' GREEN_DARK='\033[0;32m' GREEN_UNDERLINE='\033[4;32m' NC='\033[0m' - +nohup /root/TDinternal/debug/build/bin/taosd -c /root/TDinternal/community/sim/dnode1/cfg >/dev/null & +./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4 +pidof taosd|xargs kill grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log for memError in `grep 'ERROR SUMMARY' crash_gen_mem_err.log | awk '{print $4}'` diff --git a/tests/pytest/query/kill_query.py b/tests/pytest/query/kill_query.py new file mode 100644 index 0000000000000000000000000000000000000000..8975eea2685b93e551e0008fbf5013d5e16e934f --- /dev/null +++ b/tests/pytest/query/kill_query.py @@ -0,0 +1,82 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql +from util.dnodes import tdDnodes +import os +import threading +import time + + +class TDTestCase: + """ + kill query + """ + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + + def query(self): + conn = taos.connect(host='127.0.0.1', user='root', password='taosdata', config='/etc/config') + cursor = conn.cursor() + while True: + cursor.execute('show queries;') + print('show queries!') + temp = cursor.fetchall() + if temp: + print(temp[0][0]) + cursor.execute('kill query %s ;' % temp[0][0]) + print('kill query success') + break + time.sleep(0.5) + cursor.close() + conn.close() + + def run(self): + tdSql.prepare() + + print("==============step1") + os.system('yes | sudo taosdemo -n 100') + print('insert into test.meters 10000000 rows') + + + t1 = threading.Thread(target=self.query) + t1.setDaemon(True) + t1.start() + + print("==============step2") + tdSql.execute('use test;') + try: + print('============begin select * from 10000000 rows') + tdSql.query('select * from test.meters;') + # print(tdSql.queryResult) + except Exception as e: + if not "ProgrammingError('Query terminated'" in str(e): + raise Exception('fail') + + print('success') + print('kill query success') + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) + diff --git a/tests/pytest/updatetest.sh b/tests/pytest/updatetest.sh deleted file mode 100644 index ade11805536a796b7ef95d45d5040902a86e2d6c..0000000000000000000000000000000000000000 --- a/tests/pytest/updatetest.sh +++ /dev/null @@ -1,10 +0,0 @@ -# update -python3 ./test.py -f update/allow_update.py -python3 ./test.py -f update/allow_update-0.py -python3 ./test.py -f update/append_commit_data.py -python3 ./test.py -f update/append_commit_last-0.py -python3 ./test.py -f update/append_commit_last.py -python3 ./test.py -f update/merge_commit_data.py -python3 ./test.py -f update/merge_commit_data2.py -python3 ./test.py -f update/merge_commit_last-0.py -python3 ./test.py -f update/merge_commit_last.py \ No newline at end of file diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index c5fcf575ae551db7dd9ae25c06e815e7889b856d..76230f79f0136975006e81cee4236a2fa9521214 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -134,66 +134,8 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna #1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 | #1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | -sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; -if $rows != 20 then - return -1 -endi - -if $data00 != @70-01-01 08:01:40.800@ then - return -1 -endi - -if $data01 != 10 then - return -1 -endi - -if $data02 != 45.000000000 then - return -1 -endi - -if $data03 != 0 then - return -1 -endi - -if $data04 != 1 then - return -1 -endi - -if $data05 != 0 then - return -1 -endi - -if $data06 != 0 then - return -1 -endi - -if $data10 != @70-01-01 08:01:40.790@ then - return -1 -endi - -if $data11 != 10 then - return -1 -endi - -if $data12 != 945.000000000 then - return -1 -endi - -if $data13 != 90 then - return -1 -endi - -if $data14 != 1 then - return -1 -endi - -if $data15 != 1 then - return -1 -endi +sql_error select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; -if $data16 != 0 then - return -1 -endi sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; if $rows != 100 then @@ -261,59 +203,9 @@ if $data16 != 2 then endi # this function will cause shell crash -sql select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; -if $rows != 100 then - return -1 -endi +sql_error select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; -if $data00 != @70-01-01 08:01:40.990@ then - return -1 -endi - -if $data01 != 10 then - return -1 -endi - -if $data02 != 90 then - return -1 -endi - -if $data03 != 0 then - return -1 -endi - -if $data11 != 10 then - return -1 -endi - -if $data12 != 80 then - return -1 -endi - -if $data13 != 0 then - return -1 -endi - -sql select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc; -if $rows != 1 then - return -1 -endi - -if $data00 != @70-01-01 08:00:00.000@ then - return -1 -endi - -if $data01 != 1 then - return -1 -endi - -if $data02 != 0 then - return -1 -endi - -if $data03 != 0 then - return -1 -endi +sql_error select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc; sql_error select count(join_mt0.c1), first(join_mt0.c1)-last(join_mt1.c1), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); sql select count(join_mt0.c1), first(join_mt0.c1)/count(*), first(join_mt1.c9) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts;", NULL); diff --git a/tests/script/general/parser/stream_on_sys.sim b/tests/script/general/parser/stream_on_sys.sim index 5507b4db484006fc8ae7887a47bcd83df5208c50..845a484488137e509a79bbb057f104378843cb8f 100644 --- a/tests/script/general/parser/stream_on_sys.sim +++ b/tests/script/general/parser/stream_on_sys.sim @@ -22,12 +22,12 @@ $i = 0 sql use $db -sql create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s) sliding(2s) -sql create table memstrm as select count(*), avg(mem_taosd), max(mem_taosd), min(mem_taosd), avg(mem_system), first(mem_total), last(mem_total) from log.dn1 interval(4s) sliding(2s) -sql create table diskstrm as select count(*), avg(disk_used), last(disk_used), avg(disk_total), first(disk_total) from log.dn1 interval(4s) sliding(2s) -sql create table bandstrm as select count(*), avg(band_speed), last(band_speed) from log.dn1 interval(4s) sliding(2s) -sql create table reqstrm as select count(*), avg(req_http), last(req_http), avg(req_select), last(req_select), avg(req_insert), last(req_insert) from log.dn1 interval(4s) sliding(2s) -sql create table iostrm as select count(*), avg(io_read), last(io_read), avg(io_write), last(io_write) from log.dn1 interval(4s) sliding(2s) +sql create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s) +sql create table memstrm as select count(*), avg(mem_taosd), max(mem_taosd), min(mem_taosd), avg(mem_system), first(mem_total), last(mem_total) from log.dn1 interval(4s) +sql create table diskstrm as select count(*), avg(disk_used), last(disk_used), avg(disk_total), first(disk_total) from log.dn1 interval(4s) +sql create table bandstrm as select count(*), avg(band_speed), last(band_speed) from log.dn1 interval(4s) +sql create table reqstrm as select count(*), avg(req_http), last(req_http), avg(req_select), last(req_select), avg(req_insert), last(req_insert) from log.dn1 interval(4s) +sql create table iostrm as select count(*), avg(io_read), last(io_read), avg(io_write), last(io_write) from log.dn1 interval(4s) sleep 120000 sql select * from cpustrm if $rows <= 0 then