提交 e7fb3285 编写于 作者: F freemine

Merge remote-tracking branch 'upstream/develop' into odbc

...@@ -105,30 +105,8 @@ pipeline { ...@@ -105,30 +105,8 @@ pipeline {
make > /dev/null make > /dev/null
cd ${WKC}/tests/pytest cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log ./valgrind-test.sh 2>&1 > mem-error-out.log
grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log ./handle_val_log.sh
for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\
More than our threshold! ## ${NC}"
travis_terminate $memError
fi
fi
done
grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log
for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 13 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \\
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
travis_terminate $defiMemError
fi
fi
done
date date
cd ${WKC}/tests cd ${WKC}/tests
./test-all.sh b3 ./test-all.sh b3
......
...@@ -937,6 +937,7 @@ static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, ...@@ -937,6 +937,7 @@ static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows,
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
......
...@@ -72,17 +72,10 @@ typedef struct SLocalReducer { ...@@ -72,17 +72,10 @@ typedef struct SLocalReducer {
bool orderPrjOnSTable; // projection query on stable bool orderPrjOnSTable; // projection query on stable
} SLocalReducer; } SLocalReducer;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfTotal; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SRetrieveSupport { typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor; tOrderDescriptor *pOrderDescriptor;
SColumnModel * pFinalColModel; // colModel for final result SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql; SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
......
...@@ -66,7 +66,6 @@ typedef struct STidTags { ...@@ -66,7 +66,6 @@ typedef struct STidTags {
#pragma pack(pop) #pragma pack(pop)
typedef struct SJoinSupporter { typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query int32_t subqueryIndex; // index of sub query
SInterval interval; SInterval interval;
...@@ -207,8 +206,6 @@ void tscTagCondRelease(STagCond* pCond); ...@@ -207,8 +206,6 @@ void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeartBeat(SSqlObj* pHb);
bool tscShouldBeFreed(SSqlObj* pSql); bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
......
...@@ -334,6 +334,12 @@ typedef struct STscObj { ...@@ -334,6 +334,12 @@ typedef struct STscObj {
T_REF_DECLARE() T_REF_DECLARE()
} STscObj; } STscObj;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SSqlObj { typedef struct SSqlObj {
void *signature; void *signature;
pthread_t owner; // owner of sql object, by which it is executed pthread_t owner; // owner of sql object, by which it is executed
...@@ -355,10 +361,11 @@ typedef struct SSqlObj { ...@@ -355,10 +361,11 @@ typedef struct SSqlObj {
tsem_t rspSem; tsem_t rspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint16_t numOfSubs;
SSubqueryState subState;
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
struct SSqlObj *prev, *next;
struct SSqlObj **self; struct SSqlObj **self;
} SSqlObj; } SSqlObj;
......
...@@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
} }
void tscProcessAsyncRes(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
SSqlRes *pRes = &pSql->res;
assert(pSql->fp != NULL && pSql->fetchFp != NULL);
pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code);
}
// this function will be executed by queue task threads, so the terrno is not valid // this function will be executed by queue task threads, so the terrno is not valid
static void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle; void (*fp)() = pMsg->ahandle;
...@@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) { ...@@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql); tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return; return;
} else {
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
} }
SSchedMsg schedMsg = { 0 }; tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
schedMsg.fp = tscProcessAsyncRes;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
void tscProcessAsyncFree(SSchedMsg *pMsg) { SSqlRes *pRes = &pSql->res;
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; assert(pSql->fp != NULL && pSql->fetchFp != NULL);
tscDebug("%p sql is freed", pSql);
taos_free_result(pSql); pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code);
} }
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
......
...@@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs); (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
if (*pMemBuffer == NULL) { if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql); tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
...@@ -242,6 +242,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -242,6 +242,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pQdesc->stime = htobe64(pSql->stime); pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId); pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(pSql->res.useconds); pQdesc->useconds = htobe64(pSql->res.useconds);
pQdesc->qHandle = htobe64(pSql->res.qhandle);
pHeartbeat->numOfQueries++; pHeartbeat->numOfQueries++;
pQdesc++; pQdesc++;
......
...@@ -1605,8 +1605,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1605,8 +1605,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, char* aliasName, static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
int32_t resColIdx, SColumnIndex* pColIndex) { char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types"; const char* msg1 = "not support column types";
int16_t type = 0; int16_t type = 0;
...@@ -1652,8 +1652,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -1652,8 +1652,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &index); tscColumnListInsert(pQueryInfo->colList, &index);
// if it is not in the final result, do not add it
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); if (finalResult) {
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr);
} else {
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1928,7 +1933,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1928,7 +1933,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
index.columnIndex = j; index.columnIndex = j;
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index) != 0) { if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} }
...@@ -1945,7 +1950,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1945,7 +1950,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -1982,7 +1988,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1982,7 +1988,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
SColumnIndex index = {.tableIndex = j, .columnIndex = i}; SColumnIndex index = {.tableIndex = j, .columnIndex = i};
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index) != 0) { if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h"
#include "tlockfree.h" #include "tlockfree.h"
SRpcCorEpSet tscMgmtEpSet; SRpcCorEpSet tscMgmtEpSet;
...@@ -198,15 +197,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -198,15 +197,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
return; return;
} }
if (tscShouldFreeHeartBeat(pHB)) { void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
tscDebug("%p free HB object and release connection", pHB); if (p == NULL) {
pObj->pHb = 0; tscWarn("%p HB object has been released already", pHB);
taos_free_result(pHB); return;
} else { }
int32_t code = tscProcessSql(pHB);
if (code != TSDB_CODE_SUCCESS) { assert(*pHB->self == pHB);
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} int32_t code = tscProcessSql(pHB);
taosCacheRelease(tscObjCache, (void**) &p, false);
if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
} }
} }
...@@ -474,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -474,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
for (int i = 0; i < pSql->numOfSubs; ++i) { for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here // NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i]; SSqlObj *pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
continue; continue;
} }
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
if (pSub->pRpcCtx != NULL) { if (p == NULL) {
rpcCancelRequest(pSub->pRpcCtx); continue;
pSub->pRpcCtx = NULL;
} }
tscQueueAsyncRes(pSub); // async res? not other functions? SSqlObj* pSubObj = (SSqlObj*) (*p);
assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) {
rpcCancelRequest(pSubObj->pRpcCtx);
pSubObj->pRpcCtx = NULL;
}
tscQueueAsyncRes(pSubObj); // async res? not other functions?
taosCacheRelease(tscObjCache, (void**) &p, false);
} }
tscDebug("%p super table query cancelled", pSql); tscDebug("%p super table query cancelled", pSql);
...@@ -1451,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { ...@@ -1451,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
int32_t code = pRes->code; int32_t code = pRes->code;
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
...@@ -1494,6 +1506,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1494,6 +1506,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
// TODO refactor full_name
char *db; // ugly code to move the space char *db; // ugly code to move the space
db = strstr(pObj->db, TS_PATH_DELIMITER); db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1; db = (db == NULL) ? pObj->db : db + 1;
...@@ -1501,6 +1514,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1501,6 +1514,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
pConnect->pid = htonl(taosGetPId());
taosGetCurrentAPPName(pConnect->appName, NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1653,6 +1669,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1653,6 +1669,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams; pHeartbeat->numOfStreams = numOfStreams;
pHeartbeat->pid = htonl(taosGetPId());
taosGetCurrentAPPName(pHeartbeat->appName, NULL);
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
......
...@@ -20,13 +20,13 @@ ...@@ -20,13 +20,13 @@
#include "tcache.h" #include "tcache.h"
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tutil.h" #include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h" #include "tscProfile.h"
static bool validImpl(const char* str, size_t maxsize) { static bool validImpl(const char* str, size_t maxsize) {
...@@ -261,9 +261,6 @@ void taos_close(TAOS *taos) { ...@@ -261,9 +261,6 @@ void taos_close(TAOS *taos) {
return; return;
} }
pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer));
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
...@@ -463,6 +460,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -463,6 +460,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || if (pRes->qhandle == 0 ||
pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) { pCmd->command == TSDB_SQL_INSERT) {
return NULL; return NULL;
...@@ -526,7 +524,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -526,7 +524,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pRes->numOfClauseTotal = 0; pRes->numOfClauseTotal = 0;
pRes->rspType = 0; pRes->rspType = 0;
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
assert(pSql->fp == NULL); assert(pSql->fp == NULL);
......
...@@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
taosTFree(pTableMetaInfo->vgroupList); taosTFree(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
......
此差异已折叠。
...@@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { ...@@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
pSql->self = 0; pSql->self = 0;
tscResetSqlCmdObj(pCmd, false); tscResetSqlCmdObj(pCmd, false);
} }
static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { static void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->numOfSubs == 0) { if (pSql->subState.numOfSub == 0) {
return; return;
} }
tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs); tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->numOfSubs; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i); tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i);
taos_free_result(pSql->pSubs[i]); taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
} }
/** /**
...@@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("%p start to free sqlObj", pSql); tscDebug("%p start to free sqlObj", pSql);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
pSql->signature = NULL; pSql->signature = NULL;
...@@ -1516,13 +1518,6 @@ void tscSetFreeHeatBeat(STscObj* pObj) { ...@@ -1516,13 +1518,6 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
} }
bool tscShouldFreeHeartBeat(SSqlObj* pHb) {
assert(pHb == pHb->signature);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE;
}
/* /*
* the following four kinds of SqlObj should not be freed * the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing * 1. SqlObj for stream computing
...@@ -2291,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2291,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* *
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed. * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/ */
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
...@@ -2323,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2323,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes->numOfTotal = num; pRes->numOfTotal = num;
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->subState.numOfSub = 0;
pSql->fp = fp; pSql->fp = fp;
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
......
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
...@@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 256 #define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb #define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
#define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_BYTES_PER_ROW 16384
#define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_TAGS_LEN 16384
...@@ -282,7 +284,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -282,7 +284,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF #define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20 #define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 64 #define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_MQTT_HOSTNAME_LEN 64 #define TSDB_MQTT_HOSTNAME_LEN 64
......
...@@ -305,6 +305,8 @@ typedef struct { ...@@ -305,6 +305,8 @@ typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
int32_t pid;
} SCMConnectMsg; } SCMConnectMsg;
typedef struct { typedef struct {
...@@ -746,6 +748,7 @@ typedef struct { ...@@ -746,6 +748,7 @@ typedef struct {
uint32_t queryId; uint32_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; int64_t stime;
uint64_t qHandle;
} SQueryDesc; } SQueryDesc;
typedef struct { typedef struct {
...@@ -761,8 +764,10 @@ typedef struct { ...@@ -761,8 +764,10 @@ typedef struct {
typedef struct { typedef struct {
uint32_t connId; uint32_t connId;
int32_t pid;
int32_t numOfQueries; int32_t numOfQueries;
int32_t numOfStreams; int32_t numOfStreams;
char appName[TSDB_APPNAME_LEN];
char pData[]; char pData[];
} SCMHeartBeatMsg; } SCMHeartBeatMsg;
......
...@@ -23,6 +23,8 @@ extern "C" { ...@@ -23,6 +23,8 @@ extern "C" {
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char appName[TSDB_APPNAME_LEN]; // app name that invokes taosc
uint32_t pid; // pid of app that invokes taosc
int8_t killed; int8_t killed;
uint16_t port; uint16_t port;
uint32_t ip; uint32_t ip;
...@@ -40,7 +42,7 @@ typedef struct { ...@@ -40,7 +42,7 @@ typedef struct {
int32_t mnodeInitProfile(); int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn); void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
......
...@@ -224,7 +224,8 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, ...@@ -224,7 +224,8 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows,
mnodeDecClusterRef(pCluster); mnodeDecClusterRef(pCluster);
numOfRows++; numOfRows++;
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
...@@ -760,7 +760,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -760,7 +760,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mnodeVacuumResult(data, cols, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecUserRef(pUser); mnodeDecUserRef(pUser);
return numOfRows; return numOfRows;
......
...@@ -790,6 +790,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -790,6 +790,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
...@@ -891,8 +892,8 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC ...@@ -891,8 +892,8 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
...@@ -992,6 +993,7 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v ...@@ -992,6 +993,7 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v
} }
} }
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
...@@ -1083,8 +1085,8 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -1083,8 +1085,8 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
} else { } else {
numOfRows = 0; numOfRows = 0;
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
......
...@@ -480,8 +480,8 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -480,8 +480,8 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
......
...@@ -24,15 +24,9 @@ ...@@ -24,15 +24,9 @@
#include "mnode.h" #include "mnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeAcct.h"
#include "mnodeDnode.h"
#include "mnodeDb.h"
#include "mnodeMnode.h"
#include "mnodeProfile.h" #include "mnodeProfile.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeTable.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "mnodeWrite.h" #include "mnodeWrite.h"
#define CONN_KEEP_TIME (tsShellActivityTimer * 3) #define CONN_KEEP_TIME (tsShellActivityTimer * 3)
...@@ -78,7 +72,7 @@ void mnodeCleanupProfile() { ...@@ -78,7 +72,7 @@ void mnodeCleanupProfile() {
} }
} }
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app) {
#if 0 #if 0
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) { if (connSize > tsMaxShellConns) {
...@@ -96,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -96,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
.ip = ip, .ip = ip,
.port = port, .port = port,
.connId = connId, .connId = connId,
.stime = taosGetTimestampMs() .stime = taosGetTimestampMs(),
.pid = pid,
}; };
tstrncpy(connObj.user, user, sizeof(connObj.user)); tstrncpy(connObj.user, user, tListLen(connObj.user));
tstrncpy(connObj.appName, app, tListLen(connObj.appName));
connObj.lastAccess = connObj.stime; connObj.lastAccess = connObj.stime;
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000);
...@@ -183,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -183,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
// app name
pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "app_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
// app pid
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "pid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port"); strcpy(pSchema[cols].name, "ip:port");
...@@ -191,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -191,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "login time"); strcpy(pSchema[cols].name, "login_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "last access"); strcpy(pSchema[cols].name, "last_access");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -236,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -236,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
cols++; cols++;
// app name
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->appName, pShow->bytes[cols]);
cols++;
// app pid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*)pWrite = pConnObj->pid;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
...@@ -254,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -254,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 5; mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
} }
...@@ -299,7 +319,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -299,7 +319,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "queryId"); strcpy(pSchema[cols].name, "query_id");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -315,9 +335,15 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -315,9 +335,15 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 24;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "qhandle");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time"); strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -352,7 +378,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v ...@@ -352,7 +378,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
SConnObj *pConnObj = NULL; SConnObj *pConnObj = NULL;
int32_t cols = 0; int32_t cols = 0;
char * pWrite; char * pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6]; char str[TSDB_IPv4ADDR_LEN + 6] = {0};
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj);
...@@ -362,9 +388,9 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v ...@@ -362,9 +388,9 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
SQueryDesc *pDesc = pConnObj->pQueries + i; SQueryDesc *pDesc = pConnObj->pQueries + i;
cols = 0; cols = 0;
snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -372,8 +398,15 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v ...@@ -372,8 +398,15 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
cols++;
char handleBuf[24] = {0};
snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -393,8 +426,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v ...@@ -393,8 +426,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 6; mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
} }
...@@ -522,8 +554,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v ...@@ -522,8 +554,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 8; mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
} }
......
...@@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { ...@@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
rowsToRead = pShow->numOfRows - pShow->numOfReads; rowsToRead = pShow->numOfRows - pShow->numOfReads;
} }
/* return no more than 100 meters in one round trip */ /* return no more than 100 tables in one round trip */
if (rowsToRead > 100) rowsToRead = 100; if (rowsToRead > 100) rowsToRead = 100;
/* /*
...@@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t connId = htonl(pHBMsg->connId); int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); pHBMsg->pid = htonl(pHBMsg->pid);
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
} }
if (pConn == NULL) { if (pConn == NULL) {
...@@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over; goto connect_over;
} }
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); pConnectMsg->pid = htonl(pConnectMsg->pid);
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName);
if (pConn == NULL) { if (pConn == NULL) {
code = terrno; code = terrno;
} else { } else {
......
...@@ -63,27 +63,27 @@ static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t ...@@ -63,27 +63,27 @@ static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t
static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg);
static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg); static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg);
static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg); static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg);
static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg); static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg);
static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName);
...@@ -460,12 +460,14 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { ...@@ -460,12 +460,14 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
void *oldSchema = pTable->schema; void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash; void *oldVgHash = pTable->vgHash;
int32_t oldRefCount = pTable->refCount; int32_t oldRefCount = pTable->refCount;
int32_t oldNumOfTables = pTable->numOfTables;
memcpy(pTable, pNew, sizeof(SSuperTableObj)); memcpy(pTable, pNew, sizeof(SSuperTableObj));
pTable->vgHash = oldVgHash; pTable->vgHash = oldVgHash;
pTable->refCount = oldRefCount; pTable->refCount = oldRefCount;
pTable->schema = pNew->schema; pTable->schema = pNew->schema;
pTable->numOfTables = oldNumOfTables;
free(pNew); free(pNew);
free(oldTableId); free(oldTableId);
free(oldSchema); free(oldSchema);
...@@ -1384,9 +1386,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1384,9 +1386,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 5;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
return numOfRows; return numOfRows;
...@@ -2543,6 +2544,25 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -2543,6 +2544,25 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8; // table uid
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "uid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols); pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
...@@ -2568,6 +2588,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2568,6 +2588,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
return 0; return 0;
} }
int32_t cols = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SChildTableObj *pTable = NULL; SChildTableObj *pTable = NULL;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
...@@ -2608,8 +2629,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2608,8 +2629,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
continue; continue;
} }
int32_t cols = 0; cols = 0;
char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, pShow->bytes[cols]);
...@@ -2638,14 +2658,29 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2638,14 +2658,29 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
cols++; cols++;
// uid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t*) pWrite = pTable->uid;
cols++;
// tid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*) pWrite = pTable->sid;
cols++;
//vgid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*) pWrite = pTable->vgId;
cols++;
numOfRows++; numOfRows++;
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 4;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern); free(pattern);
...@@ -2843,9 +2878,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -2843,9 +2878,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 4;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
return numOfRows; return numOfRows;
......
...@@ -385,8 +385,8 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -385,8 +385,8 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi
numOfRows++; numOfRows++;
mnodeDecUserRef(pUser); mnodeDecUserRef(pUser);
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
......
...@@ -771,7 +771,8 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -771,7 +771,8 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
numOfRows++; numOfRows++;
} }
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
......
...@@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread); ...@@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetPthreadId(); int64_t taosGetPthreadId();
void taosResetPthread(pthread_t *thread); void taosResetPthread(pthread_t *thread);
bool taosComparePthread(pthread_t first, pthread_t second); bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char *name, int32_t* len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } ...@@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); }
void taosResetPthread(pthread_t *thread) { *thread = 0; } void taosResetPthread(pthread_t *thread) { *thread = 0; }
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
const char* self = "/proc/self/exe";
char path[PATH_MAX] = {0};
if (readlink(self, path, PATH_MAX) <= 0) {
return -1;
}
path[PATH_MAX - 1] = 0;
char* end = strrchr(path, '/');
if (end == NULL) {
return -1;
}
++end;
strcpy(name, end);
if (len != NULL) {
*len = strlen(name);
}
return 0;
}
#endif #endif
\ No newline at end of file
...@@ -203,7 +203,7 @@ static void taosGetSystemTimezone() { ...@@ -203,7 +203,7 @@ static void taosGetSystemTimezone() {
snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
// cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; // cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
uInfo("timezone not configured, set to system default:%s", tsTimezone); uWarn("timezone not configured, set to system default:%s", tsTimezone);
} }
/* /*
...@@ -235,7 +235,7 @@ static void taosGetSystemLocale() { // get and set default locale ...@@ -235,7 +235,7 @@ static void taosGetSystemLocale() { // get and set default locale
strcpy(tsLocale, "en_US.UTF-8"); strcpy(tsLocale, "en_US.UTF-8");
} else { } else {
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
uError("locale not configured, set to system default:%s", tsLocale); uWarn("locale not configured, set to system default:%s", tsLocale);
} }
} }
......
...@@ -36,3 +36,21 @@ int64_t taosGetPthreadId() { ...@@ -36,3 +36,21 @@ int64_t taosGetPthreadId() {
bool taosComparePthread(pthread_t first, pthread_t second) { bool taosComparePthread(pthread_t first, pthread_t second) {
return first.p == second.p; return first.p == second.p;
} }
int32_t taosGetPId() {
return GetCurrentProcessId();
}
int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
char filepath[1024] = {0};
GetModuleFileName(NULL, filepath, MAX_PATH);
*strrchr(filepath,'.') = '\0';
strcpy(name, filepath);
if (len != NULL) {
*len = (int32_t) strlen(filepath);
}
return 0;
}
\ No newline at end of file
...@@ -107,12 +107,14 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -107,12 +107,14 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
free(pNode); free(pNode);
} }
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t) pElem->pData) { if (pElem->pData->signature != (uint64_t) pElem->pData) {
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return; return NULL;
} }
STrashElem* next = pElem->next;
pCacheObj->numOfElemsInTrash--; pCacheObj->numOfElemsInTrash--;
if (pElem->prev) { if (pElem->prev) {
pElem->prev->next = pElem->next; pElem->prev->next = pElem->next;
...@@ -120,9 +122,15 @@ static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem ...@@ -120,9 +122,15 @@ static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem
pCacheObj->pTrash = pElem->next; pCacheObj->pTrash = pElem->next;
} }
if (pElem->next) { if (next) {
pElem->next->prev = pElem->prev; next->prev = pElem->prev;
}
if (pCacheObj->numOfElemsInTrash == 0) {
assert(pCacheObj->pTrash == NULL);
} }
return next;
} }
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
...@@ -559,31 +567,30 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { ...@@ -559,31 +567,30 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj->pTrash != NULL) { if (pCacheObj->pTrash != NULL) {
pCacheObj->pTrash = NULL;
uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash); uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
} }
pCacheObj->pTrash = NULL;
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
return; return;
} }
STrashElem *pElem = pCacheObj->pTrash; const char* stat[] = {"false", "true"};
uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0]));
STrashElem *pElem = pCacheObj->pTrash;
while (pElem) { while (pElem) {
T_REF_VAL_CHECK(pElem->pData); T_REF_VAL_CHECK(pElem->pData);
if (pElem->next == pElem) { assert(pElem->next != pElem && pElem->prev != pElem);
pElem->next = NULL;
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data, uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
pCacheObj->numOfElemsInTrash - 1); pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem; doRemoveElemInTrashcan(pCacheObj, pElem);
pElem = pElem->next; doDestroyTrashcanElem(pCacheObj, pElem);
pElem = pCacheObj->pTrash;
doRemoveElemInTrashcan(pCacheObj, p);
doDestroyTrashcanElem(pCacheObj, p);
} else { } else {
pElem = pElem->next; pElem = pElem->next;
} }
......
...@@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL; code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return code; return code;
} }
......
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
grep 'start to execute\|ERROR SUMMARY' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-mem-error-out.log
for memError in `grep 'ERROR SUMMARY' uniq-mem-error-out.log | awk '{print $4}'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\
More than our threshold! ## ${NC}"
fi
fi
done
grep 'start to execute\|definitely lost:' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-definitely-lost-out.log
for defiMemError in `grep 'definitely lost:' uniq-definitely-lost-out.log | awk '{print $7}'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 13 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
fi
fi
done
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册