提交 755fdb05 编写于 作者: S Shengliang Guan

[TD-464] kill query and kill stream

上级 5ed28feb
......@@ -196,71 +196,81 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
pthread_mutex_unlock(&pObj->mutex);
if (pStream) {
tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId);
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
} else {
tscError("failed to kill stream, streamId:%d not exist", killId);
}
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
}
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SCMHeartBeatMsg *pHeartbeat = pMsg;
pHeartbeat->connId = htonl(pObj->connId);
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
// We extract the lock to tscBuildHeartBeatMsg function.
//SSqlObj *pSql = pObj->sqlList;
//while (pSql) {
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
/*
* avoid sqlobj may not be correctly removed from sql list
* e.g., forgetting to free the sql result may cause the sql object still in sql list
*/
// if (pSql->sqlstr == NULL) {
// pSql = pSql->next;
// continue;
// }
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
strncpy(pQdesc->sql, "select * from d1.t1", TSDB_SHOW_SQL_LEN - 1);
strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pQdesc->stime = htobe64(taosGetTimestampMs());
pQdesc->queryId = htonl(12);
pQdesc->useconds = htonl(34567);
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(pSql->res.useconds);
pHeartbeat->numOfQueries++;
pQdesc++;
//pSql = pSql->next;
//}
pSql = pSql->next;
if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
}
pHeartbeat->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc *)pQdesc;
//SSqlStream *pStream = pObj->streamList;
//while (pStream) {
strncpy(pSdesc->sql, "select * from d1.s1", TSDB_SHOW_SQL_LEN - 1);
SSqlStream *pStream = pObj->streamList;
while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pSdesc->streamId = htonl(98);
pSdesc->num = htobe64(76543);
pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = htobe64(pStream->num);
pSdesc->useconds = htobe64(21);
pSdesc->stime = htobe64(taosGetTimestampMs()-1000);
pSdesc->ctime = htobe64(taosGetTimestampMs());
pSdesc->useconds = htobe64(pStream->useconds);
pSdesc->stime = htobe64(pStream->stime - pStream->interval);
pSdesc->ctime = htobe64(pStream->ctime);
pSdesc->slidingTime = htobe64(567);
pSdesc->interval = htobe64(89);
pSdesc->slidingTime = htobe64(pStream->slidingTime);
pSdesc->interval = htobe64(pStream->interval);
pHeartbeat->numOfStreams++;
pSdesc++;
//pStream = pStream->next;
//}
pStream = pStream->next;
if (pHeartbeat->numOfStreams >= allocedStreamsNum) break;
}
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SCMHeartBeatMsg);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
return pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg);
return msgLen;
}
void tscKillConnection(STscObj *pObj) {
......
......@@ -2238,13 +2238,14 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
SSQLToken* idStr = &(pInfo->pDCLInfo->ip);
if (idStr->n > TSDB_KILL_MSG_LEN) {
return TSDB_CODE_INVALID_SQL;
}
strncpy(pCmd->payload, idStr->z, idStr->n);
const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim);
char* queryIdStr = strtok(NULL, &delim);
......@@ -2256,7 +2257,6 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
}
if (killType == TSDB_SQL_KILL_CONNECTION) {
strncpy(pCmd->payload, idStr->z, idStr->n);
return TSDB_CODE_SUCCESS;
}
......@@ -2269,8 +2269,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
strncpy(pCmd->payload, idStr->z, idStr->n);
return TSDB_CODE_SUCCESS;
}
......
......@@ -1140,13 +1140,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMKillQueryMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
switch (pCmd->command) {
case TSDB_SQL_KILL_QUERY:
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
......@@ -1753,24 +1746,20 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pthread_mutex_lock(&pObj->mutex);
int32_t numOfQueries = 0;
int32_t numOfQueries = 0;
SSqlObj *tpSql = pObj->sqlList;
while (tpSql) {
tpSql = tpSql->next;
numOfQueries++;
}
int32_t numOfStreams = 0;
int32_t numOfStreams = 0;
SSqlStream *pStream = pObj->streamList;
while (pStream) {
pStream = pStream->next;
numOfStreams++;
}
// ==>
numOfQueries = 1;
numOfStreams = 1;
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
......@@ -1779,6 +1768,8 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams;
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
pthread_mutex_unlock(&pObj->mutex);
......
......@@ -269,8 +269,8 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) {
memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc));
}
pConn->numOfQueries = htonl(pHBMsg->numOfQueries);
if (pConn->numOfQueries > 0) {
pConn->numOfStreams = htonl(pHBMsg->numOfStreams);
if (pConn->numOfStreams > 0) {
pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams);
memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc),
pConn->numOfStreams * sizeof(SStreamDesc));
......@@ -504,7 +504,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = htonl(pDesc->num);
*(int32_t *)pWrite = (int32_t)htobe64(pDesc->num);
cols++;
numOfRows++;
......@@ -522,13 +522,18 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
char* connIdStr = pKill->queryId;
mPrint("kill query msg is received, queryId:%s", pKill->queryId);
char *chr = strchr(connIdStr, ':');
if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID;
*chr = 0;
const char delim = ':';
char* connIdStr = strtok(pKill->queryId, &delim);
char* queryIdStr = strtok(NULL, &delim);
if (queryIdStr == NULL || connIdStr == NULL) {
mPrint("failed to kill query, queryId:%s", pKill->queryId);
return TSDB_CODE_INVALID_QUERY_ID;
}
uint32_t queryId = atoi(chr + 1);
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr);
if (pConn == NULL) {
......@@ -547,13 +552,18 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
char* connIdStr = pKill->queryId;
char *chr = strchr(connIdStr, ':');
if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID;
*chr = 0;
mPrint("kill stream msg is received, streamId:%s", pKill->queryId);
const char delim = ':';
char* connIdStr = strtok(pKill->queryId, &delim);
char* streamIdStr = strtok(NULL, &delim);
if (streamIdStr == NULL || connIdStr == NULL) {
mPrint("failed to kill stream, streamId:%s", pKill->queryId);
return TSDB_CODE_INVALID_STREAM_ID;
}
uint32_t streamId = atoi(chr + 1);
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr);
if (pConn == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册