提交 5ed28feb 编写于 作者: S Shengliang Guan

[TD-464] show queries and show streams

上级 ebc8c4d8
......@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql);
void tscRemoveFromSqlList(SSqlObj *pSql);
void tscAddIntoStreamList(SSqlStream *pStream);
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj);
int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj);
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj);
void tscKillQuery(STscObj *pObj, uint32_t killId);
void tscKillStream(STscObj *pObj, uint32_t killId);
void tscKillConnection(STscObj *pObj);
......
......@@ -19,6 +19,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "taosmsg.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL;
......@@ -96,9 +97,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
char *sql = malloc(200);
int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SHOW_SQL_LEN - 1) {
sqlLen = len + TSDB_SHOW_SQL_LEN - 1;
int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) {
sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1;
} else {
sqlLen += len;
}
......@@ -206,72 +207,60 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
taos_close_stream(pStream);
}
int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
char * pStart = pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQqueryList *pQList = (SQqueryList *)pMsg;
pQList->numOfQueries = 0;
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SCMHeartBeatMsg *pHeartbeat = pMsg;
pHeartbeat->connId = htonl(pObj->connId);
SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList));
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
//SSqlObj *pSql = pObj->sqlList;
//while (pSql) {
/*
* avoid sqlobj may not be correctly removed from sql list
* e.g., forgetting to free the sql result may cause the sql object still in sql list
*/
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
// if (pSql->sqlstr == NULL) {
// pSql = pSql->next;
// continue;
// }
strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
strncpy(pQdesc->sql, "select * from d1.t1", TSDB_SHOW_SQL_LEN - 1);
pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pQdesc->stime = pSql->stime;
pQdesc->queryId = pSql->queryId;
pQdesc->useconds = pSql->res.useconds;
pQdesc->stime = htobe64(taosGetTimestampMs());
pQdesc->queryId = htonl(12);
pQdesc->useconds = htonl(34567);
pQList->numOfQueries++;
pHeartbeat->numOfQueries++;
pQdesc++;
pSql = pSql->next;
pMsg += sizeof(SQueryDesc);
if (pMsg > pMax) break;
}
//pSql = pSql->next;
//}
SStreamList *pSList = (SStreamList *)pMsg;
pSList->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList));
pHeartbeat->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc *)pQdesc;
pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList;
while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
//SSqlStream *pStream = pObj->streamList;
//while (pStream) {
strncpy(pSdesc->sql, "select * from d1.s1", TSDB_SHOW_SQL_LEN - 1);
pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pSdesc->streamId = pStream->streamId;
pSdesc->num = pStream->num;
pSdesc->streamId = htonl(98);
pSdesc->num = htobe64(76543);
pSdesc->useconds = pStream->useconds;
pSdesc->stime = pStream->stime - pStream->interval;
pSdesc->ctime = pStream->ctime;
pSdesc->useconds = htobe64(21);
pSdesc->stime = htobe64(taosGetTimestampMs()-1000);
pSdesc->ctime = htobe64(taosGetTimestampMs());
pSdesc->slidingTime = pStream->slidingTime;
pSdesc->interval = pStream->interval;
pSdesc->slidingTime = htobe64(567);
pSdesc->interval = htobe64(89);
pSList->numOfStreams++;
pHeartbeat->numOfStreams++;
pSdesc++;
pStream = pStream->next;
pMsg += sizeof(SStreamDesc);
if (pMsg > pMax) break;
}
/* pthread_mutex_unlock (&pObj->mutex); */
//pStream = pStream->next;
//}
return pMsg - pStart;
return pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg);
}
void tscKillConnection(STscObj *pObj) {
......
......@@ -1747,52 +1747,45 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
int size = 0;
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
size += tsRpcHeadSize;
size += sizeof(SQqueryList);
pthread_mutex_lock(&pObj->mutex);
int32_t numOfQueries = 0;
SSqlObj *tpSql = pObj->sqlList;
while (tpSql) {
size += sizeof(SQueryDesc);
tpSql = tpSql->next;
numOfQueries++;
}
size += sizeof(SStreamList);
int32_t numOfStreams = 0;
SSqlStream *pStream = pObj->streamList;
while (pStream) {
size += sizeof(SStreamDesc);
pStream = pStream->next;
numOfStreams++;
}
return size + TSDB_EXTRA_PAYLOAD_SIZE;
}
// ==>
numOfQueries = 1;
numOfStreams = 1;
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex);
int size = tscEstimateHeartBeatMsgLength(pSql);
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql);
return -1;
}
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg*)pCmd->payload;
pHeartbeat->connId = htonl(pSql->pTscObj->connId);
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
int msgLen = tscBuildQueryStreamDesc((char*)pHeartbeat + sizeof(pHeartbeat->connId), pObj);
pthread_mutex_unlock(&pObj->mutex);
pCmd->payloadLen = msgLen + sizeof(pHeartbeat->connId);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
}
......
......@@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SQL_LEN 64
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1
......
......@@ -707,18 +707,11 @@ typedef struct {
int64_t interval;
} SStreamDesc;
typedef struct {
int32_t numOfQueries;
} SQqueryList;
typedef struct {
int32_t numOfStreams;
} SStreamList;
typedef struct {
uint32_t connId;
SQqueryList qlist;
SStreamList slist;
int32_t numOfQueries;
int32_t numOfStreams;
char pData[];
} SCMHeartBeatMsg;
typedef struct {
......
......@@ -182,8 +182,6 @@ typedef struct SUserObj {
int8_t updateEnd[1];
int32_t refCount;
struct SAcctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj;
typedef struct {
......
......@@ -29,6 +29,12 @@ typedef struct {
uint32_t connId;
uint64_t stime;
uint64_t lastAccess;
uint32_t queryId;
uint32_t streamId;
int32_t numOfQueries;
int32_t numOfStreams;
SStreamDesc *pStreams;
SQueryDesc * pQueries;
} SConnObj;
int32_t mnodeInitProfile();
......@@ -37,6 +43,7 @@ void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -243,23 +243,27 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
// pHBRsp->killConnection = 1;
} else {
pHBRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg);
if (pConn->killed != 0) {
pHBRsp->killConnection = 1;
}
if (pConn->streamId != 0) {
pHBRsp->streamId = htonl(pConn->streamId);
pConn->streamId = 0;
}
if (pConn->queryId != 0) {
pHBRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
}
}
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList);
/*
* TODO
* Dispose kill stream or kill query message
*/
// pHBRsp->queryId = 0;
// pHBRsp->streamId = 0;
// pHBRsp->killConnection = 0;
pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
......
......@@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN);
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, "
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册