diff --git a/Jenkinsfile b/Jenkinsfile index 3e6f3d018dba5e547ee393e23998ca87681c0d93..5a8f6a551ea3355c094218b4857e99c81a331eb3 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -105,30 +105,8 @@ pipeline { make > /dev/null cd ${WKC}/tests/pytest ./valgrind-test.sh 2>&1 > mem-error-out.log - grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log - - for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'` - do - if [ -n "$memError" ]; then - if [ "$memError" -gt 12 ]; then - echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\ - More than our threshold! ## ${NC}" - travis_terminate $memError - fi - fi - done - - grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log - for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'` - do - if [ -n "$defiMemError" ]; then - if [ "$defiMemError" -gt 13 ]; then - echo -e "${RED} ## Memory errors number valgrind reports \\ - Definitely lost is $defiMemError. More than our threshold! ## ${NC}" - travis_terminate $defiMemError - fi - fi - done + ./handle_val_log.sh + date cd ${WKC}/tests ./test-all.sh b3 diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 8701b012ff8660c4bd568e00ce10db614da8dfa3..fc1853f3d818be80f14728e3cce16c60e0e61123 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -937,6 +937,7 @@ static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, mnodeDecDnodeRef(pDnode); } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 4e579b0729431b0fc00d69431a304c4546f02fb0..5baa66a9e0229f35c431cea7a0d2dbb9e2ffb0e2 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -72,17 +72,10 @@ typedef struct SLocalReducer { bool orderPrjOnSTable; // projection query on stable } SLocalReducer; -typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery - int32_t numOfTotal; // the number of total sub-queries - uint64_t numOfRetrievedRows; // total number of points in this query -} SSubqueryState; - typedef struct SRetrieveSupport { tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; SColumnModel * pFinalColModel; // colModel for final result - SSubqueryState * pState; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0323434a992ae25b9f3fe54fb596b56e865bdf55..594226b1fc2925cc8f36faed58125e1be715d9bd 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -66,7 +66,6 @@ typedef struct STidTags { #pragma pack(pop) typedef struct SJoinSupporter { - SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query SInterval interval; @@ -207,8 +206,6 @@ void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); -void tscSetFreeHeatBeat(STscObj* pObj); -bool tscShouldFreeHeartBeat(SSqlObj* pHb); bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 046bea2c27df8bbe1cc012d1511980f691ff60a5..b6ab3702c939ffc01e2d0a2c2c5cce8ddf0f2a8f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,12 @@ typedef struct STscObj { T_REF_DECLARE() } STscObj; +typedef struct SSubqueryState { + int32_t numOfRemain; // the number of remain unfinished subquery + int32_t numOfSub; // the number of total sub-queries + uint64_t numOfRetrievedRows; // total number of points in this query +} SSubqueryState; + typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed @@ -355,10 +361,11 @@ typedef struct SSqlObj { tsem_t rspSem; SSqlCmd cmd; SSqlRes res; - uint16_t numOfSubs; + + SSubqueryState subState; struct SSqlObj **pSubs; - struct SSqlObj * prev, *next; + struct SSqlObj *prev, *next; struct SSqlObj **self; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 400613595b605bae4e5c7c23353aa4243a0eb933..a76b77bb865dfd88ff35406ac5a4d2538e35d7d8 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); } -void tscProcessAsyncRes(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - SSqlRes *pRes = &pSql->res; - assert(pSql->fp != NULL && pSql->fetchFp != NULL); - - pSql->fp = pSql->fetchFp; - (*pSql->fp)(pSql->param, pSql, pRes->code); -} - // this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; @@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); return; - } else { - tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); } - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessAsyncRes; - schedMsg.ahandle = pSql; - schedMsg.thandle = (void *)1; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); -} + tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); -void tscProcessAsyncFree(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - tscDebug("%p sql is freed", pSql); - taos_free_result(pSql); + SSqlRes *pRes = &pSql->res; + assert(pSql->fp != NULL && pSql->fetchFp != NULL); + + pSql->fp = pSql->fetchFp; + (*pSql->fp)(pSql->param, pSql, pRes->code); } int tscSendMsgToServer(SSqlObj *pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 19abbe32e5ce67b626dd94c3db9574d6a3afe6e8..16f208da989b5d7d2f0592f38d95de80797af431 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs); + (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub); if (*pMemBuffer == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index f2f997a59494ab7ab4e8da31edb4b819730e30c4..bcd52d3a4295ebaa5d84b322de9809e10ab23901 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -242,6 +242,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->stime = htobe64(pSql->stime); pQdesc->queryId = htonl(pSql->queryId); pQdesc->useconds = htobe64(pSql->res.useconds); + pQdesc->qHandle = htobe64(pSql->res.qhandle); pHeartbeat->numOfQueries++; pQdesc++; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f9947ce675f008967c905a7d3959fb976ddd7424..7fd7ce26dd2ca72ec796e7bd49aaa839e87124fc 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1605,8 +1605,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t return TSDB_CODE_SUCCESS; } -static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, char* aliasName, - int32_t resColIdx, SColumnIndex* pColIndex) { +static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, + char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* msg1 = "not support column types"; int16_t type = 0; @@ -1652,8 +1652,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscColumnListInsert(pQueryInfo->colList, &index); + // if it is not in the final result, do not add it SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); - insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + if (finalResult) { + insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + } else { + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0])); + } return TSDB_CODE_SUCCESS; } @@ -1928,7 +1933,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { index.columnIndex = j; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -1945,7 +1950,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index) != 0) { + + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1982,7 +1988,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d3c02011690fd20e0cb7d7aaf7b98877581bbb42..5cddaa1c4d1541a1f103673fe92d481eec2f0cc0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -24,7 +24,6 @@ #include "tschemautil.h" #include "tsclient.h" #include "ttimer.h" -#include "tutil.h" #include "tlockfree.h" SRpcCorEpSet tscMgmtEpSet; @@ -198,15 +197,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { return; } - if (tscShouldFreeHeartBeat(pHB)) { - tscDebug("%p free HB object and release connection", pHB); - pObj->pHb = 0; - taos_free_result(pHB); - } else { - int32_t code = tscProcessSql(pHB); - if (code != TSDB_CODE_SUCCESS) { - tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); - } + void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + tscWarn("%p HB object has been released already", pHB); + return; + } + + assert(*pHB->self == pHB); + + int32_t code = tscProcessSql(pHB); + taosCacheRelease(tscObjCache, (void**) &p, false); + + if (code != TSDB_CODE_SUCCESS) { + tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } } @@ -474,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - for (int i = 0; i < pSql->numOfSubs; ++i) { + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } - pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSub->pRpcCtx != NULL) { - rpcCancelRequest(pSub->pRpcCtx); - pSub->pRpcCtx = NULL; + void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + continue; } - tscQueueAsyncRes(pSub); // async res? not other functions? + SSqlObj* pSubObj = (SSqlObj*) (*p); + assert(pSubObj->self == (SSqlObj**) p); + + pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + if (pSubObj->pRpcCtx != NULL) { + rpcCancelRequest(pSubObj->pRpcCtx); + pSubObj->pRpcCtx = NULL; + } + + tscQueueAsyncRes(pSubObj); // async res? not other functions? + taosCacheRelease(tscObjCache, (void**) &p, false); } tscDebug("%p super table query cancelled", pSql); @@ -1451,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; int32_t code = pRes->code; if (pRes->code != TSDB_CODE_SUCCESS) { @@ -1494,6 +1506,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; + // TODO refactor full_name char *db; // ugly code to move the space db = strstr(pObj->db, TS_PATH_DELIMITER); db = (db == NULL) ? pObj->db : db + 1; @@ -1501,6 +1514,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); + pConnect->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pConnect->appName, NULL); + return TSDB_CODE_SUCCESS; } @@ -1653,6 +1669,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfStreams = numOfStreams; + + pHeartbeat->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pHeartbeat->appName, NULL); + int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); pthread_mutex_unlock(&pObj->mutex); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 449a92abcb0dce600eac530354d39144d7ae1bac..ef8625da06c349b14b9bc9294462bf627da5bb5b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -20,13 +20,13 @@ #include "tcache.h" #include "tnote.h" #include "trpc.h" -#include "ttimer.h" #include "tscLog.h" #include "tscSubquery.h" #include "tscUtil.h" #include "tsclient.h" #include "ttokendef.h" #include "tutil.h" +#include "ttimer.h" #include "tscProfile.h" static bool validImpl(const char* str, size_t maxsize) { @@ -261,9 +261,6 @@ void taos_close(TAOS *taos) { return; } - pObj->signature = NULL; - taosTmrStopA(&(pObj->pTimer)); - SSqlObj* pHb = pObj->pHb; if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode @@ -463,6 +460,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlRes *pRes = &pSql->res; if (pRes->qhandle == 0 || + pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; @@ -526,7 +524,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pRes->numOfClauseTotal = 0; pRes->rspType = 0; - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pSql->pSubs); assert(pSql->fp == NULL); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d01ede279aaad882e26bada0c8a6bb7069747406..61614b56fb0167122fb0e31ec879ef6a6cbbac5c 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 390c3ca92d3b8398d6ca88f11d0c6e45f3f77010..fbbd810a1908b8a43f494a7097425ee61d968a58 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -26,7 +26,6 @@ #include "tscSubquery.h" typedef struct SInsertSupporter { - SSubqueryState* pState; SSqlObj* pSql; int32_t index; } SInsertSupporter; @@ -174,7 +173,6 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in } pSupporter->pObj = pSql; - pSupporter->pState = pState; pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -250,7 +248,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { SJoinSupporter* pSupporter = NULL; //If the columns are not involved in the final select clause, the corresponding query will not be issued. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { pSupporter = pSql->pSubs[i]->param; if (taosArrayGetSize(pSupporter->exprList) > 0) { ++numOfSub; @@ -260,16 +258,15 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); + tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->subState.numOfSub, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = pSupporter->pState; - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfSub; - + SSubqueryState* pState = &pSql->subState; + pState->numOfRemain = pState->numOfSub; + bool success = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; pSql->pSubs[i] = NULL; @@ -322,7 +319,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); + assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); tscFieldInfoUpdateOffset(pNewQueryInfo); @@ -373,13 +370,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { if (!success) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, - pSql->numOfSubs, pSql->res.code); + pSql->subState.numOfSub, pSql->res.code); freeJoinSubqueryObj(pSql); return pSql->res.code; } - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -391,17 +388,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } void freeJoinSubqueryObj(SSqlObj* pSql) { - SSubqueryState* pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } SJoinSupporter* p = pSub->param; - pState = p->pState; - tscDestroyJoinSupporter(p); if (pSub->res.code == TSDB_CODE_SUCCESS) { @@ -409,14 +402,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } } - taosTFree(pState); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSupporter->pState->numOfRemain > 0); + assert(pSqlObj->subState.numOfRemain > 0); - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) { + if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); freeJoinSubqueryObj(pSqlObj); } @@ -680,7 +672,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -716,10 +708,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); - pSupporter->pState->numOfTotal = 2; - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; + pParentSql->subState.numOfSub = 2; + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; - for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); } @@ -818,7 +810,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -850,7 +842,6 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSqlObj* pParentSql = pSupporter->pObj; - SSubqueryState* pState = pSupporter->pState; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -871,6 +862,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pRes->numOfTotal += pRes->numOfRows; } + SSubqueryState* pState = &pParentSql->subState; if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -878,7 +870,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // for projection query, need to try next vnode if current vnode is exhausted if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { pState->numOfRemain = 1; - pState->numOfTotal = 1; + pState->numOfSub = 1; pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -888,12 +880,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal); + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub); return; } - tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code); + tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { freeJoinSubqueryObj(pParentSql); @@ -901,7 +893,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } // update the records for each subquery in parent sql object. - for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { continue; } @@ -917,32 +909,26 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { int32_t notInvolved = 0; SJoinSupporter* pSupporter = NULL; - SSubqueryState* pState = NULL; + SSubqueryState* pState = &pSql->subState; - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { notInvolved++; } else { pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; - pState = pSupporter->pState; } } - assert(pState != NULL); - if (pState != NULL) { - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfFetch; - } - + pState->numOfRemain = numOfFetch; return pSupporter; } void tscFetchDatablockFromSubquery(SSqlObj* pSql) { - assert(pSql->numOfSubs >= 1); + assert(pSql->subState.numOfSub >= 1); int32_t numOfFetch = 0; bool hasData = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { // if the subquery is NULL, it does not involved in the final result generation SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -989,7 +975,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; @@ -1124,7 +1110,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { } // wait for the other subqueries response from vnode - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -1136,7 +1122,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; @@ -1165,7 +1151,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter assert(pSql->res.numOfRows == 0); if (pSql->pSubs == NULL) { - pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1176,8 +1162,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->numOfSubs++] = pNew; - assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); + pSql->pSubs[pSql->subState.numOfRemain++] = pNew; + assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub); if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1221,7 +1207,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag - SColumnIndex index = {0}; + SColumnIndex colIndex = {0}; STagCond* pTagCond = &pSupporter->tagCond; assert(pTagCond->joinInfo.hasJoin); @@ -1234,7 +1220,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); for(int32_t i = 0; i < numOfTags; ++i) { if (pSchema[i].colId == tagColId) { - index.columnIndex = i; + colIndex.columnIndex = i; break; } } @@ -1251,18 +1237,18 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // set get tags query type TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG); + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug( "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s", pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), - numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); } else { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; - SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); // set the tags value for ts_comp function SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); @@ -1320,8 +1306,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { goto _error; } - pState->numOfTotal = pQueryInfo->numOfTables; - pState->numOfRemain = pState->numOfTotal; + pSql->subState.numOfSub = pQueryInfo->numOfTables; bool hasEmptySub = false; @@ -1354,10 +1339,10 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine + pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine break; } } @@ -1373,7 +1358,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { - assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); + assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0 && pState != NULL); for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1411,8 +1396,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + assert(pSql->subState.numOfSub > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { @@ -1422,28 +1407,26 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); if (pSql->pSubs == NULL || pState == NULL) { taosTFree(pState); taosTFree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscQueueAsyncRes(pSql); return ret; } - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - + pSql->subState.numOfRemain = pSql->subState.numOfSub; pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; - for (; i < pSql->numOfSubs; ++i) { + for (; i < pSql->subState.numOfSub; ++i) { SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1452,8 +1435,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->pState = pState; - + trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1461,8 +1443,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { break; } - trs->subqueryIndex = i; - trs->pParentSql = pSql; + trs->subqueryIndex = i; + trs->pParentSql = pSql; trs->pFinalColModel = pModel; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); @@ -1483,42 +1465,39 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); } - if (i < pSql->numOfSubs) { + if (i < pSql->subState.numOfSub) { tscError("%p failed to prepare subquery structure and launch subqueries", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; } - for(int32_t j = 0; j < pSql->numOfSubs; ++j) { + for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) { SSqlObj* pSub = pSql->pSubs[j]; SRetrieveSupport* pSupport = pSub->param; tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); tscProcessSql(pSub); } - + return TSDB_CODE_SUCCESS; } static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { tscDebug("%p start to free subquery obj", pSql); - int32_t index = trsupport->subqueryIndex; - SSqlObj *pParentSql = trsupport->pParentSql; +// int32_t index = trsupport->subqueryIndex; +// SSqlObj *pParentSql = trsupport->pParentSql; - assert(pSql == pParentSql->pSubs[index]); -// pParentSql->pSubs[index] = NULL; -// -// taos_free_result(pSql); +// assert(pSql == pParentSql->pSubs[index]); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } @@ -1581,9 +1560,14 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); - + SSubqueryState* pState = &pParentSql->subState; + int32_t remain = pState->numOfRemain; + int32_t sub = pState->numOfSub; + UNUSED(remain); + UNUSED(sub); + + assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1613,24 +1597,23 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + remain = -1; + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; } // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal, + tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, tstrerror(pParentSql->res.code)); // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, - pState->numOfTotal); + pState->numOfSub); - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes @@ -1650,7 +1633,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SSqlObj * pParentSql = trsupport->pParentSql; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; - SSubqueryState* pState = trsupport->pState; + SSubqueryState* pState = &pParentSql->subState; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; @@ -1687,9 +1670,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p } int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; @@ -1699,20 +1682,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql, - pState->numOfTotal, pState->numOfRetrievedRows); + pState->numOfSub, pState->numOfRetrievedRows); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); tscDebug("%p build loser tree completed", pParentSql); pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - // only free once - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // set the command flag must be after the semaphore been correctly set. @@ -1733,8 +1714,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -1751,6 +1732,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); + if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) { + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + } + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); @@ -1822,7 +1807,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs); + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); @@ -1893,7 +1878,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; - SSubqueryState* pState = pSupporter->pState; // record the total inserted rows if (numOfRows > 0) { @@ -1908,15 +1892,13 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } taosTFree(pSupporter); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { + + if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows); - // release data block data - taosTFree(pState); - // restore user defined fp pParentObj->fp = pParentObj->fetchFp; @@ -1937,7 +1919,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; - assert(pSupporter->index < pSupporter->pState->numOfTotal); + assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); @@ -1954,33 +1936,29 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - pSql->numOfSubs = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); + assert(pSql->subState.numOfSub > 0); pRes->code = TSDB_CODE_SUCCESS; // the number of already initialized subqueries int32_t numOfSub = 0; - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->subState.numOfRemain = pSql->subState.numOfSub; + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; } - tscDebug("%p submit data to %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub); - while(numOfSub < pSql->numOfSubs) { + while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { goto _error; } pSupporter->pSql = pSql; - pSupporter->pState = pState; pSupporter->index = numOfSub; SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); @@ -2003,12 +1981,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { numOfSub++; } else { tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub, - pSql->numOfSubs, tstrerror(pRes->code)); + pSql->subState.numOfSub, tstrerror(pRes->code)); goto _error; } } - if (numOfSub < pSql->numOfSubs) { + if (numOfSub < pSql->subState.numOfSub) { tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; @@ -2026,7 +2004,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - taosTFree(pState); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -2048,7 +2025,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); int32_t numOfRes = INT32_MAX; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2238,7 +2215,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2264,7 +2241,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { hasData = !allSubqueryExhausted; } else { // otherwise, in case inner join, if any subquery exhausted, query completed. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == 0) { continue; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d00b39e68beba06945df465ff862e2515938bb26..33c51c5571db4ce536572fdb8c36463bb29e405d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->self = 0; tscResetSqlCmdObj(pCmd, false); } -static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { - if (pSql->numOfSubs == 0) { +static void tscFreeSubobj(SSqlObj* pSql) { + if (pSql->subState.numOfSub == 0) { return; } - tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs); + tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->subState.numOfSub); - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i); taos_free_result(pSql->pSubs[i]); pSql->pSubs[i] = NULL; } - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } /** @@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("%p start to free sqlObj", pSql); + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscFreeSubobj(pSql); + tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; @@ -1516,13 +1518,6 @@ void tscSetFreeHeatBeat(STscObj* pObj) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } -bool tscShouldFreeHeartBeat(SSqlObj* pHb) { - assert(pHb == pHb->signature); - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); - return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE; -} - /* * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing @@ -2291,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. */ - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pCmd->command = TSDB_SQL_SELECT; tscResetForNextRetrieve(pRes); @@ -2323,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pRes->numOfTotal = num; taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->fp = fp; tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); diff --git a/src/connector/go b/src/connector/go index 8d7bf743852897110cbdcc7c4322cd7a74d4167b..8c58c512b6acda8bcdfa48fdc7140227b5221766 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 25814a748e4bb1a1951c93979e26daabeffe6e2e..01a4ed32f11e2badd6bda7ffdf862195c408dcbd 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 256 -#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_SQL_SHOW_LEN 512 +#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb + +#define TSDB_APPNAME_LEN TSDB_UNI_LEN #define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_TAGS_LEN 16384 @@ -282,7 +284,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_SID_MASK 0xFF #define TSDB_HTTP_TOKEN_LEN 20 -#define TSDB_SHOW_SQL_LEN 64 +#define TSDB_SHOW_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_MQTT_HOSTNAME_LEN 64 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 507ccf8d80f189954ffac3d3ae390d64cbc7fb1a..be66c76d71de1a5b96b9cbe741198a562112bc37 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -305,6 +305,8 @@ typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; char db[TSDB_TABLE_FNAME_LEN]; + char appName[TSDB_APPNAME_LEN]; + int32_t pid; } SCMConnectMsg; typedef struct { @@ -746,6 +748,7 @@ typedef struct { uint32_t queryId; int64_t useconds; int64_t stime; + uint64_t qHandle; } SQueryDesc; typedef struct { @@ -761,8 +764,10 @@ typedef struct { typedef struct { uint32_t connId; + int32_t pid; int32_t numOfQueries; int32_t numOfStreams; + char appName[TSDB_APPNAME_LEN]; char pData[]; } SCMHeartBeatMsg; diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index e39496ec9cacf431e2fefd737018d54a56e27f88..1e5b1c0f9c7c1c220a88339f00d46538c78cf938 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -23,6 +23,8 @@ extern "C" { typedef struct { char user[TSDB_USER_LEN]; + char appName[TSDB_APPNAME_LEN]; // app name that invokes taosc + uint32_t pid; // pid of app that invokes taosc int8_t killed; uint16_t port; uint32_t ip; @@ -40,7 +42,7 @@ typedef struct { int32_t mnodeInitProfile(); void mnodeCleanupProfile(); -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port); void mnodeReleaseConn(SConnObj *pConn); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 5d19b67a5e5c2b1a9169733f593eeba43e37134c..35b6a67ab24d5ee50a4f4da7bb902ab0d7232b7e 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -224,7 +224,8 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, mnodeDecClusterRef(pCluster); numOfRows++; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index a06152080ef23bbd561ef2e7b02531c6111e85df..37a626c6cc9ee4a8ffdc4f81e072ac036c1d33ec 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -760,7 +760,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void } pShow->numOfReads += numOfRows; - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecUserRef(pUser); return numOfRows; diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 4c777e4eedba6694bf854e59a8760860b9f7c010..33a869a35325787de18b883634d4efa70cfd7144 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -790,6 +790,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo mnodeDecDnodeRef(pDnode); } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -891,8 +892,8 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC mnodeDecDnodeRef(pDnode); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -992,6 +993,7 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v } } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -1083,8 +1085,8 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo } else { numOfRows = 0; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 68828ac24d8386594b16facb2d28755891952198..89b2f50b731c90c1261957da93623ba16cebf2ab 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -480,8 +480,8 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo mnodeDecMnodeRef(pMnode); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 26f38dde034788fb440a785ce6c90e8aedf8ddca..fc76e3dce39dc2f6eeee3aac19d13f2dc605c1a1 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -24,15 +24,9 @@ #include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" -#include "mnodeAcct.h" -#include "mnodeDnode.h" -#include "mnodeDb.h" -#include "mnodeMnode.h" #include "mnodeProfile.h" #include "mnodeShow.h" -#include "mnodeTable.h" #include "mnodeUser.h" -#include "mnodeVgroup.h" #include "mnodeWrite.h" #define CONN_KEEP_TIME (tsShellActivityTimer * 3) @@ -78,7 +72,7 @@ void mnodeCleanupProfile() { } } -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app) { #if 0 int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); if (connSize > tsMaxShellConns) { @@ -96,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { .ip = ip, .port = port, .connId = connId, - .stime = taosGetTimestampMs() + .stime = taosGetTimestampMs(), + .pid = pid, }; - tstrncpy(connObj.user, user, sizeof(connObj.user)); + tstrncpy(connObj.user, user, tListLen(connObj.user)); + tstrncpy(connObj.appName, app, tListLen(connObj.appName)); + connObj.lastAccess = connObj.stime; SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); @@ -183,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + // app name + pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "app_name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + // app pid + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "pid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); @@ -191,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); + strcpy(pSchema[cols].name, "login_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "last access"); + strcpy(pSchema[cols].name, "last_access"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -236,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); cols++; + // app name + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->appName, pShow->bytes[cols]); + cols++; + + // app pid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*)pWrite = pConnObj->pid; + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); @@ -254,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 5; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } @@ -299,7 +319,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "queryId"); + strcpy(pSchema[cols].name, "query_id"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -315,9 +335,15 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 24; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "qhandle"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created time"); + strcpy(pSchema[cols].name, "created_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -352,7 +378,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v SConnObj *pConnObj = NULL; int32_t cols = 0; char * pWrite; - char ipStr[TSDB_IPv4ADDR_LEN + 6]; + char str[TSDB_IPv4ADDR_LEN + 6] = {0}; while (numOfRows < rows) { pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); @@ -362,9 +388,9 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v SQueryDesc *pDesc = pConnObj->pQueries + i; cols = 0; - snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); + snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -372,8 +398,15 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + cols++; + + char handleBuf[24] = {0}; + snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -393,8 +426,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 6; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } @@ -522,8 +554,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 8; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index e587758e465d7a138275862ad411af884c0b78ec..80909e99aec6d752d35042ca2d761a6e8b923441 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { rowsToRead = pShow->numOfRows - pShow->numOfReads; } - /* return no more than 100 meters in one round trip */ + /* return no more than 100 tables in one round trip */ if (rowsToRead > 100) rowsToRead = 100; /* @@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { int32_t connId = htonl(pHBMsg->connId); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); if (pConn == NULL) { - pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pHBMsg->pid = htonl(pHBMsg->pid); + pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName); } if (pConn == NULL) { @@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { goto connect_over; } - SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pConnectMsg->pid = htonl(pConnectMsg->pid); + SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName); if (pConn == NULL) { code = terrno; } else { diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 1bc328800e1924879d5af18ab4f6c876238e0ae0..b2f1436dba53bc3c2ad2ce618fc3ae648d1b68ae 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -63,27 +63,27 @@ static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg); static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg); -static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *mnodeMsg); -static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg); -static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg); static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg); static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg); static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg); -static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg); static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); @@ -460,12 +460,14 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { void *oldSchema = pTable->schema; void *oldVgHash = pTable->vgHash; int32_t oldRefCount = pTable->refCount; + int32_t oldNumOfTables = pTable->numOfTables; memcpy(pTable, pNew, sizeof(SSuperTableObj)); pTable->vgHash = oldVgHash; pTable->refCount = oldRefCount; pTable->schema = pNew->schema; + pTable->numOfTables = oldNumOfTables; free(pNew); free(oldTableId); free(oldSchema); @@ -1384,9 +1386,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 5; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); return numOfRows; @@ -2543,6 +2544,25 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 8; // table uid + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "uid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "vgId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); pShow->numOfColumns = cols; @@ -2568,6 +2588,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows return 0; } + int32_t cols = 0; int32_t numOfRows = 0; SChildTableObj *pTable = NULL; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; @@ -2608,8 +2629,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows continue; } - int32_t cols = 0; - + cols = 0; char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, pShow->bytes[cols]); @@ -2638,14 +2658,29 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows cols++; + // uid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t*) pWrite = pTable->uid; + cols++; + + + // tid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*) pWrite = pTable->sid; + cols++; + + //vgid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*) pWrite = pTable->vgId; + cols++; + numOfRows++; mnodeDecTableRef(pTable); } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 4; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); free(pattern); @@ -2843,9 +2878,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 4; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); return numOfRows; diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 130b68646faa3497d02faa3f2eb19a68508762bb..779e25254897ff37f1ca884e83469233d7197b8c 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -385,8 +385,8 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi numOfRows++; mnodeDecUserRef(pUser); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 283132697d393d586b561d41e43d58160c0447d1..1f4132544b17f0eac2a23271d10a0f632ef48a4d 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -771,7 +771,8 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v mnodeDecVgroupRef(pVgroup); numOfRows++; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; mnodeDecTableRef(pTable); diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemphone.h index 4280b458a65b23b3aa680c6ed46f61beb5c48948..a71e74e97f4d9910414a5e801b89a1968d1df050 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemphone.h @@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetPthreadId(); void taosResetPthread(pthread_t *thread); bool taosComparePthread(pthread_t first, pthread_t second); +int32_t taosGetPId(); +int32_t taosGetCurrentAPPName(char *name, int32_t* len); #ifdef __cplusplus } diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index b91888845edd9e98315994eaada9eca245aacb24..9eb8c18a40a11fac9ada037163011cdcf92201fb 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } void taosResetPthread(pthread_t *thread) { *thread = 0; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } +int32_t taosGetPId() { return getpid(); } + +int32_t taosGetCurrentAPPName(char *name, int32_t* len) { + const char* self = "/proc/self/exe"; + char path[PATH_MAX] = {0}; + + if (readlink(self, path, PATH_MAX) <= 0) { + return -1; + } + + path[PATH_MAX - 1] = 0; + char* end = strrchr(path, '/'); + if (end == NULL) { + return -1; + } + + ++end; + + strcpy(name, end); + + if (len != NULL) { + *len = strlen(name); + } + + return 0; +} #endif \ No newline at end of file diff --git a/src/os/src/detail/osSysinfo.c b/src/os/src/detail/osSysinfo.c index f6470fc3e1e84e38013d963d4ea2d3a2ae58833b..8df671f9c8974ecf77e5bf0943950772e3f9d192 100644 --- a/src/os/src/detail/osSysinfo.c +++ b/src/os/src/detail/osSysinfo.c @@ -203,7 +203,7 @@ static void taosGetSystemTimezone() { snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); // cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; - uInfo("timezone not configured, set to system default:%s", tsTimezone); + uWarn("timezone not configured, set to system default:%s", tsTimezone); } /* @@ -235,7 +235,7 @@ static void taosGetSystemLocale() { // get and set default locale strcpy(tsLocale, "en_US.UTF-8"); } else { tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); - uError("locale not configured, set to system default:%s", tsLocale); + uWarn("locale not configured, set to system default:%s", tsLocale); } } diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemphone.c index ded7e418430be2674b82761771579c7b84a65007..1f723540f695e4a4609a9ec74e773a1559e7f30a 100644 --- a/src/os/src/windows/wSemphone.c +++ b/src/os/src/windows/wSemphone.c @@ -36,3 +36,21 @@ int64_t taosGetPthreadId() { bool taosComparePthread(pthread_t first, pthread_t second) { return first.p == second.p; } + +int32_t taosGetPId() { + return GetCurrentProcessId(); +} + +int32_t taosGetCurrentAPPName(char *name, int32_t* len) { + char filepath[1024] = {0}; + + GetModuleFileName(NULL, filepath, MAX_PATH); + *strrchr(filepath,'.') = '\0'; + strcpy(name, filepath); + + if (len != NULL) { + *len = (int32_t) strlen(filepath); + } + + return 0; +} \ No newline at end of file diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2637699adb1c6b7508418fe10e189ff37a969b2a..289648157427fe3de0412423cbe52def4ca46cbb 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -107,12 +107,14 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo free(pNode); } -static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { +static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { if (pElem->pData->signature != (uint64_t) pElem->pData) { uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); - return; + return NULL; } + STrashElem* next = pElem->next; + pCacheObj->numOfElemsInTrash--; if (pElem->prev) { pElem->prev->next = pElem->next; @@ -120,9 +122,15 @@ static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem pCacheObj->pTrash = pElem->next; } - if (pElem->next) { - pElem->next->prev = pElem->prev; + if (next) { + next->prev = pElem->prev; + } + + if (pCacheObj->numOfElemsInTrash == 0) { + assert(pCacheObj->pTrash == NULL); } + + return next; } static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { @@ -559,31 +567,30 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->pTrash != NULL) { + pCacheObj->pTrash = NULL; uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash); } - pCacheObj->pTrash = NULL; __cache_unlock(pCacheObj); return; } - STrashElem *pElem = pCacheObj->pTrash; + const char* stat[] = {"false", "true"}; + uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name, + pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0])); + STrashElem *pElem = pCacheObj->pTrash; while (pElem) { T_REF_VAL_CHECK(pElem->pData); - if (pElem->next == pElem) { - pElem->next = NULL; - } + assert(pElem->next != pElem && pElem->prev != pElem); if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data, pCacheObj->numOfElemsInTrash - 1); - STrashElem *p = pElem; - pElem = pElem->next; - - doRemoveElemInTrashcan(pCacheObj, p); - doDestroyTrashcanElem(pCacheObj, p); + doRemoveElemInTrashcan(pCacheObj, pElem); + doDestroyTrashcanElem(pCacheObj, pElem); + pElem = pCacheObj->pTrash; } else { pElem = pElem->next; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 58e97075ac3f98197a6b6e17ac135d8bf9405b66..a3ed080f2443b9015d1a18f9b1c5ad2195a35d00 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return code; } diff --git a/tests/pytest/handle_val_log.sh b/tests/pytest/handle_val_log.sh new file mode 100644 index 0000000000000000000000000000000000000000..142cf1074552256d712bcef891fff74d4f4396b0 --- /dev/null +++ b/tests/pytest/handle_val_log.sh @@ -0,0 +1,29 @@ +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' + +grep 'start to execute\|ERROR SUMMARY' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-mem-error-out.log + +for memError in `grep 'ERROR SUMMARY' uniq-mem-error-out.log | awk '{print $4}'` +do +if [ -n "$memError" ]; then + if [ "$memError" -gt 12 ]; then + echo -e "${RED} ## Memory errors number valgrind reports is $memError.\ + More than our threshold! ## ${NC}" + fi +fi +done + +grep 'start to execute\|definitely lost:' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-definitely-lost-out.log +for defiMemError in `grep 'definitely lost:' uniq-definitely-lost-out.log | awk '{print $7}'` +do +if [ -n "$defiMemError" ]; then + if [ "$defiMemError" -gt 13 ]; then + echo -e "${RED} ## Memory errors number valgrind reports \ + Definitely lost is $defiMemError. More than our threshold! ## ${NC}" + fi +fi +done \ No newline at end of file