From 755fdb05610619fc538743d8cfc216171965b82d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 5 Jun 2020 03:41:42 +0000 Subject: [PATCH] [TD-464] kill query and kill stream --- src/client/src/tscProfile.c | 78 ++++++++++++++++++++--------------- src/client/src/tscSQLParser.c | 7 ++-- src/client/src/tscServer.c | 17 ++------ src/mnode/src/mnodeProfile.c | 38 ++++++++++------- 4 files changed, 75 insertions(+), 65 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 09a34eb0c3..1a7b8426a0 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -196,71 +196,81 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { } pthread_mutex_unlock(&pObj->mutex); - + if (pStream) { tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId); + if (pStream->callback) { + pStream->callback(pStream->param); + } + taos_close_stream(pStream); + } else { + tscError("failed to kill stream, streamId:%d not exist", killId); } - - if (pStream->callback) { - pStream->callback(pStream->param); - } - taos_close_stream(pStream); } int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { SCMHeartBeatMsg *pHeartbeat = pMsg; - pHeartbeat->connId = htonl(pObj->connId); - + int allocedQueriesNum = pHeartbeat->numOfQueries; + int allocedStreamsNum = pHeartbeat->numOfStreams; + pHeartbeat->numOfQueries = 0; SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData; // We extract the lock to tscBuildHeartBeatMsg function. - - //SSqlObj *pSql = pObj->sqlList; - //while (pSql) { + + SSqlObj *pSql = pObj->sqlList; + while (pSql) { /* * avoid sqlobj may not be correctly removed from sql list * e.g., forgetting to free the sql result may cause the sql object still in sql list */ - // if (pSql->sqlstr == NULL) { - // pSql = pSql->next; - // continue; - // } + if (pSql->sqlstr == NULL) { + pSql = pSql->next; + continue; + } - strncpy(pQdesc->sql, "select * from d1.t1", TSDB_SHOW_SQL_LEN - 1); + strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pQdesc->stime = htobe64(taosGetTimestampMs()); - pQdesc->queryId = htonl(12); - pQdesc->useconds = htonl(34567); + pQdesc->stime = htobe64(pSql->stime); + pQdesc->queryId = htonl(pSql->queryId); + pQdesc->useconds = htobe64(pSql->res.useconds); pHeartbeat->numOfQueries++; pQdesc++; - //pSql = pSql->next; - //} + pSql = pSql->next; + if (pHeartbeat->numOfQueries >= allocedQueriesNum) break; + } pHeartbeat->numOfStreams = 0; SStreamDesc *pSdesc = (SStreamDesc *)pQdesc; - //SSqlStream *pStream = pObj->streamList; - //while (pStream) { - strncpy(pSdesc->sql, "select * from d1.s1", TSDB_SHOW_SQL_LEN - 1); + SSqlStream *pStream = pObj->streamList; + while (pStream) { + strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pSdesc->streamId = htonl(98); - pSdesc->num = htobe64(76543); + pSdesc->streamId = htonl(pStream->streamId); + pSdesc->num = htobe64(pStream->num); - pSdesc->useconds = htobe64(21); - pSdesc->stime = htobe64(taosGetTimestampMs()-1000); - pSdesc->ctime = htobe64(taosGetTimestampMs()); + pSdesc->useconds = htobe64(pStream->useconds); + pSdesc->stime = htobe64(pStream->stime - pStream->interval); + pSdesc->ctime = htobe64(pStream->ctime); - pSdesc->slidingTime = htobe64(567); - pSdesc->interval = htobe64(89); + pSdesc->slidingTime = htobe64(pStream->slidingTime); + pSdesc->interval = htobe64(pStream->interval); pHeartbeat->numOfStreams++; pSdesc++; - //pStream = pStream->next; - //} + pStream = pStream->next; + if (pHeartbeat->numOfStreams >= allocedStreamsNum) break; + } + + int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + + sizeof(SCMHeartBeatMsg); + pHeartbeat->connId = htonl(pObj->connId); + pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries); + pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams); - return pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg); + return msgLen; } void tscKillConnection(STscObj *pObj) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 858245e376..c3745acdb3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2238,13 +2238,14 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { SSqlCmd* pCmd = &pSql->cmd; pCmd->command = pInfo->type; - + SSQLToken* idStr = &(pInfo->pDCLInfo->ip); if (idStr->n > TSDB_KILL_MSG_LEN) { return TSDB_CODE_INVALID_SQL; } strncpy(pCmd->payload, idStr->z, idStr->n); + const char delim = ':'; char* connIdStr = strtok(idStr->z, &delim); char* queryIdStr = strtok(NULL, &delim); @@ -2256,7 +2257,6 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { } if (killType == TSDB_SQL_KILL_CONNECTION) { - strncpy(pCmd->payload, idStr->z, idStr->n); return TSDB_CODE_SUCCESS; } @@ -2269,8 +2269,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } - - strncpy(pCmd->payload, idStr->z, idStr->n); + return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c4a44f5b76..2bf177905e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1140,13 +1140,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SCMKillQueryMsg); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { - tscError("%p failed to malloc for query msg", pSql); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload; - strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n); switch (pCmd->command) { case TSDB_SQL_KILL_QUERY: pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY; @@ -1753,24 +1746,20 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pthread_mutex_lock(&pObj->mutex); - int32_t numOfQueries = 0; + int32_t numOfQueries = 0; SSqlObj *tpSql = pObj->sqlList; while (tpSql) { tpSql = tpSql->next; numOfQueries++; } - int32_t numOfStreams = 0; + int32_t numOfStreams = 0; SSqlStream *pStream = pObj->streamList; while (pStream) { pStream = pStream->next; numOfStreams++; } - // ==> - numOfQueries = 1; - numOfStreams = 1; - int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); @@ -1779,6 +1768,8 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; + pHeartbeat->numOfQueries = numOfQueries; + pHeartbeat->numOfStreams = numOfStreams; int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); pthread_mutex_unlock(&pObj->mutex); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 17e486a0eb..49f79a54f1 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -269,8 +269,8 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) { memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc)); } - pConn->numOfQueries = htonl(pHBMsg->numOfQueries); - if (pConn->numOfQueries > 0) { + pConn->numOfStreams = htonl(pHBMsg->numOfStreams); + if (pConn->numOfStreams > 0) { pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams); memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), pConn->numOfStreams * sizeof(SStreamDesc)); @@ -504,7 +504,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = htonl(pDesc->num); + *(int32_t *)pWrite = (int32_t)htobe64(pDesc->num); cols++; numOfRows++; @@ -522,13 +522,18 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; - char* connIdStr = pKill->queryId; + mPrint("kill query msg is received, queryId:%s", pKill->queryId); - char *chr = strchr(connIdStr, ':'); - if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID; - *chr = 0; + const char delim = ':'; + char* connIdStr = strtok(pKill->queryId, &delim); + char* queryIdStr = strtok(NULL, &delim); + + if (queryIdStr == NULL || connIdStr == NULL) { + mPrint("failed to kill query, queryId:%s", pKill->queryId); + return TSDB_CODE_INVALID_QUERY_ID; + } - uint32_t queryId = atoi(chr + 1); + int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); if (pConn == NULL) { @@ -547,13 +552,18 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; - char* connIdStr = pKill->queryId; - - char *chr = strchr(connIdStr, ':'); - if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID; - *chr = 0; + mPrint("kill stream msg is received, streamId:%s", pKill->queryId); + + const char delim = ':'; + char* connIdStr = strtok(pKill->queryId, &delim); + char* streamIdStr = strtok(NULL, &delim); + + if (streamIdStr == NULL || connIdStr == NULL) { + mPrint("failed to kill stream, streamId:%s", pKill->queryId); + return TSDB_CODE_INVALID_STREAM_ID; + } - uint32_t streamId = atoi(chr + 1); + int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); if (pConn == NULL) { -- GitLab