diff --git a/src/client/inc/tscProfile.h b/src/client/inc/tscProfile.h index 16b9efac38aaf7b03f21b13ba9316518c66e775d..193fac0fb00eaa20c883215e5708eea8f07f1bc9 100644 --- a/src/client/inc/tscProfile.h +++ b/src/client/inc/tscProfile.h @@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql); void tscRemoveFromSqlList(SSqlObj *pSql); void tscAddIntoStreamList(SSqlStream *pStream); void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj); -char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); +int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj); void tscKillQuery(STscObj *pObj, uint32_t killId); void tscKillStream(STscObj *pObj, uint32_t killId); void tscKillConnection(STscObj *pObj); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 8e728508224d3d77bc1dd84b6ee55616382282eb..a5aee431bb0921eb2adf2f1762c4061d25a3a499 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,6 +294,7 @@ typedef struct STscObj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; + uint32_t connId; struct SSqlObj * pHb; struct SSqlObj * sqlList; struct SSqlStream *streamList; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index e6d9aad6e22257005140c0b08c229111b3e1fe7c..79d00bf5dc6e9075af64c12de776c8022841e180 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -19,6 +19,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "taosmsg.h" void tscSaveSlowQueryFp(void *handle, void *tmrId); void *tscSlowQueryConn = NULL; @@ -96,7 +97,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) { } tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr); - int32_t sqlSize = TSDB_SHOW_SQL_LEN + size; + int32_t sqlSize = TSDB_SLOW_QUERY_SQL_LEN + size; char *sql = malloc(sqlSize); if (sql == NULL) { @@ -106,9 +107,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) { int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName, pSql->pTscObj->user, pSql->stime, pSql->res.useconds); - int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr); - if (sqlLen > TSDB_SHOW_SQL_LEN - 1) { - sqlLen = len + TSDB_SHOW_SQL_LEN - 1; + int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr); + if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) { + sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1; } else { sqlLen += len; } @@ -205,28 +206,28 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { } pthread_mutex_unlock(&pObj->mutex); - + if (pStream) { tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId); + if (pStream->callback) { + pStream->callback(pStream->param); + } + taos_close_stream(pStream); + } else { + tscError("failed to kill stream, streamId:%d not exist", killId); } - - if (pStream->callback) { - pStream->callback(pStream->param); - } - taos_close_stream(pStream); } -char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { - char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; +int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { + SCMHeartBeatMsg *pHeartbeat = pMsg; + int allocedQueriesNum = pHeartbeat->numOfQueries; + int allocedStreamsNum = pHeartbeat->numOfStreams; - SQqueryList *pQList = (SQqueryList *)pMsg; - pQList->numOfQueries = 0; - - SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList)); + pHeartbeat->numOfQueries = 0; + SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData; // We extract the lock to tscBuildHeartBeatMsg function. - /* pthread_mutex_lock (&pObj->mutex); */ - pMsg += sizeof(SQqueryList); + SSqlObj *pSql = pObj->sqlList; while (pSql) { /* @@ -240,47 +241,46 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pQdesc->stime = pSql->stime; - pQdesc->queryId = pSql->queryId; - pQdesc->useconds = pSql->res.useconds; + pQdesc->stime = htobe64(pSql->stime); + pQdesc->queryId = htonl(pSql->queryId); + pQdesc->useconds = htobe64(pSql->res.useconds); - pQList->numOfQueries++; + pHeartbeat->numOfQueries++; pQdesc++; pSql = pSql->next; - pMsg += sizeof(SQueryDesc); - if (pMsg > pMax) break; + if (pHeartbeat->numOfQueries >= allocedQueriesNum) break; } - SStreamList *pSList = (SStreamList *)pMsg; - pSList->numOfStreams = 0; - - SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList)); + pHeartbeat->numOfStreams = 0; + SStreamDesc *pSdesc = (SStreamDesc *)pQdesc; - pMsg += sizeof(SStreamList); SSqlStream *pStream = pObj->streamList; while (pStream) { strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pSdesc->streamId = pStream->streamId; - pSdesc->num = pStream->num; + pSdesc->streamId = htonl(pStream->streamId); + pSdesc->num = htobe64(pStream->num); - pSdesc->useconds = pStream->useconds; - pSdesc->stime = pStream->stime - pStream->interval; - pSdesc->ctime = pStream->ctime; + pSdesc->useconds = htobe64(pStream->useconds); + pSdesc->stime = htobe64(pStream->stime - pStream->interval); + pSdesc->ctime = htobe64(pStream->ctime); - pSdesc->slidingTime = pStream->slidingTime; - pSdesc->interval = pStream->interval; + pSdesc->slidingTime = htobe64(pStream->slidingTime); + pSdesc->interval = htobe64(pStream->interval); - pSList->numOfStreams++; + pHeartbeat->numOfStreams++; pSdesc++; pStream = pStream->next; - pMsg += sizeof(SStreamDesc); - if (pMsg > pMax) break; + if (pHeartbeat->numOfStreams >= allocedStreamsNum) break; } - /* pthread_mutex_unlock (&pObj->mutex); */ + int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + + sizeof(SCMHeartBeatMsg); + pHeartbeat->connId = htonl(pObj->connId); + pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries); + pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams); - return pMsg; + return msgLen; } void tscKillConnection(STscObj *pObj) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 14ab14603f36660c8a5d15a612b015536a009147..bfa8a937915e7a7c39b2a6601a5a4d7bdec39e22 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo static int32_t validateDNodeConfig(tDCLSQL* pOptions); static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateColumnName(char* name); -static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); +static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); @@ -531,7 +531,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_STREAM: case TSDB_SQL_KILL_CONNECTION: { - if ((code = setKillInfo(pSql, pInfo)) != TSDB_CODE_SUCCESS) { + if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) { return code; } @@ -2229,37 +2229,45 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_SUCCESS; } -int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { - const char* msg1 = "invalid ip address"; - const char* msg2 = "invalid port"; +int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { + const char* msg1 = "invalid connection ID"; + const char* msg2 = "invalid query ID"; + const char* msg3 = "invalid stream ID"; SSqlCmd* pCmd = &pSql->cmd; pCmd->command = pInfo->type; - - SSQLToken* ip = &(pInfo->pDCLInfo->ip); - if (ip->n > TSDB_KILL_MSG_LEN) { + + SSQLToken* idStr = &(pInfo->pDCLInfo->ip); + if (idStr->n > TSDB_KILL_MSG_LEN) { return TSDB_CODE_INVALID_SQL; } - strncpy(pCmd->payload, ip->z, ip->n); + strncpy(pCmd->payload, idStr->z, idStr->n); const char delim = ':'; + char* connIdStr = strtok(idStr->z, &delim); + char* queryIdStr = strtok(NULL, &delim); - char* ipStr = strtok(ip->z, &delim); - char* portStr = strtok(NULL, &delim); - - if (!validateIpAddress(ipStr, strlen(ipStr))) { + int32_t connId = (int32_t)strtol(connIdStr, NULL, 10); + if (connId <= 0) { memset(pCmd->payload, 0, strlen(pCmd->payload)); - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - uint16_t port = (uint16_t)strtol(portStr, NULL, 10); - if (port <= 0 || port > 65535) { - memset(pCmd->payload, 0, strlen(pCmd->payload)); - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (killType == TSDB_SQL_KILL_CONNECTION) { + return TSDB_CODE_SUCCESS; } + int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); + if (queryId <= 0) { + memset(pCmd->payload, 0, strlen(pCmd->payload)); + if (killType == TSDB_SQL_KILL_QUERY) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } else { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + } + return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ef9ca9de36950f092aa555ed5b17a979329726cf..5acb3237573e803e49d60db14399a94fc3e64533 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pIpList->numOfIps > 0) tscSetMgmtIpList(pIpList); + pSql->pTscObj->connId = htonl(pRsp->connId); + if (pRsp->killConnection) { tscKillConnection(pObj); } else { @@ -1129,13 +1131,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SCMKillQueryMsg); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { - tscError("%p failed to malloc for query msg", pSql); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload; - strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n); switch (pCmd->command) { case TSDB_SQL_KILL_QUERY: pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY; @@ -1743,57 +1738,43 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { - int size = 0; +int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { + SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - size += tsRpcHeadSize; - size += sizeof(SQqueryList); + pthread_mutex_lock(&pObj->mutex); + int32_t numOfQueries = 2; SSqlObj *tpSql = pObj->sqlList; while (tpSql) { - size += sizeof(SQueryDesc); tpSql = tpSql->next; + numOfQueries++; } - size += sizeof(SStreamList); + int32_t numOfStreams = 2; SSqlStream *pStream = pObj->streamList; while (pStream) { - size += sizeof(SStreamDesc); pStream = pStream->next; + numOfStreams++; } - return size + TSDB_EXTRA_PAYLOAD_SIZE; -} - -int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - char *pMsg, *pStart; - int msgLen = 0; - int size = 0; - - SSqlCmd *pCmd = &pSql->cmd; - STscObj *pObj = pSql->pTscObj; - - pthread_mutex_lock(&pObj->mutex); - - size = tscEstimateHeartBeatMsgLength(pSql); + int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); tscError("%p failed to malloc for heartbeat msg", pSql); return -1; } - pMsg = pCmd->payload; - pStart = pMsg; + SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; + pHeartbeat->numOfQueries = numOfQueries; + pHeartbeat->numOfStreams = numOfStreams; + int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); - pMsg = tscBuildQueryStreamDesc(pMsg, pObj); pthread_mutex_unlock(&pObj->mutex); - msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; - assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; } @@ -2204,6 +2185,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; + pObj->connId = htonl(pConnect->connId); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index cb39b990e53fe54ffe20a1136a8a12d2f2d86875..3a84ce8f6f3ad369e9f6972d79d3356451c4ef25 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -251,7 +251,7 @@ void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 5cd3b79aa1d1d5b088473cd8cdc3f8a0ec801ec5..7880a4b3021d71872f194f7bd5512d03ee940363 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -450,7 +450,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int iter1 = 0; int iter2 = 0; - tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge); + tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows, + pTarget->numOfRows + rowsToMerge); } tdFreeDataCols(pTarget); @@ -461,15 +462,15 @@ _err: return -1; } -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { - // TODO: add resolve duplicate key here +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { tdResetDataCols(target); + ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); while (target->numOfRows < tRows) { - if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break; + if (*iter1 >= limit1 && *iter2 >= limit2) break; - TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; - TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; + TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; + TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; if (key1 <= key2) { for (int i = 0; i < src1->numOfCols; i++) { diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 5e81d77489fa8a49e1593c663c88d2eaa053cfb0..7683843371ab4ba0a0bfb6d3b119fbbeca031b8d 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -46,7 +46,7 @@ typedef struct { void (*cleanup)(); } SDnodeComponent; -static const SDnodeComponent SDnodeComponents[] = { +static const SDnodeComponent tsDnodeComponents[] = { {"storage", dnodeInitStorage, dnodeCleanupStorage}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, @@ -72,14 +72,14 @@ static int dnodeCreateDir(const char *dir) { static void dnodeCleanupComponents(int32_t stepId) { for (int32_t i = stepId; i >= 0; i--) { - SDnodeComponents[i].cleanup(); + tsDnodeComponents[i].cleanup(); } } static int32_t dnodeInitComponents() { int32_t code = 0; - for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) { - if (SDnodeComponents[i].init() != 0) { + for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) { + if (tsDnodeComponents[i].init() != 0) { dnodeCleanupComponents(i); code = -1; break; @@ -133,7 +133,7 @@ int32_t dnodeInitSystem() { void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); - dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1); + dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1); taos_cleanup(); taosCloseLog(); } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 7d1b3046d937741a85d9bdd54e57d86b919cb89d..29181ed78faa56eef05ada6a119bf25f5b9ee67c 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_SID_MASK 0xFF #define TSDB_HTTP_TOKEN_LEN 20 -#define TSDB_SHOW_SQL_LEN 512 +#define TSDB_SHOW_SQL_LEN 64 +#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_ONLLINE 1 diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f11a2cf750a36c36f886e6b23ec068555fc4c23c..62c97c92f10c656893abe044a1cacd8b23c305f4 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -150,6 +150,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_SHELL_CONNS, 0, 0x0409, "too many shell conns") // client TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2cf0567f5aa926fd5cf9907b926c2ce1d094f7f3..88c6f9cf26ecf7dbeab27da5f7a514ec5d22b71e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -137,6 +137,7 @@ enum _mgmt_table { TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VNODES, + TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_MAX, }; @@ -299,6 +300,9 @@ typedef struct { char serverVersion[TSDB_VERSION_LEN]; int8_t writeAuth; int8_t superAuth; + int8_t reserved1; + int8_t reserved2; + int32_t connId; SRpcIpSet ipList; } SCMConnectRsp; @@ -716,16 +720,10 @@ typedef struct { } SStreamDesc; typedef struct { - int32_t numOfQueries; -} SQqueryList; - -typedef struct { - int32_t numOfStreams; -} SStreamList; - -typedef struct { - SQqueryList qlist; - SStreamList slist; + uint32_t connId; + int32_t numOfQueries; + int32_t numOfStreams; + char pData[]; } SCMHeartBeatMsg; typedef struct { @@ -733,6 +731,7 @@ typedef struct { uint32_t streamId; uint32_t totalDnodes; uint32_t onlineDnodes; + uint32_t connId; int8_t killConnection; SRpcIpSet ipList; } SCMHeartBeatRsp; diff --git a/src/inc/ttokendef.h b/src/inc/ttokendef.h index effaafc4e778dad3ab1c459500e8753e74b6e38e..d388bc9dbeea2802ccebfca85e6126c5cdc3220d 100644 --- a/src/inc/ttokendef.h +++ b/src/inc/ttokendef.h @@ -148,8 +148,8 @@ #define TK_SET 130 #define TK_KILL 131 #define TK_CONNECTION 132 -#define TK_COLON 133 -#define TK_STREAM 134 +#define TK_STREAM 133 +#define TK_COLON 134 #define TK_ABORT 135 #define TK_AFTER 136 #define TK_ATTACH 137 diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index 594fd3fd50b0adede5e5ff759a43e8f2ddba9ec2..cf9058b9cf02e5a4f475500f9033eb1e645cb83b 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -182,8 +182,6 @@ typedef struct SUserObj { int8_t updateEnd[1]; int32_t refCount; struct SAcctObj * pAcct; - SQqueryList * pQList; // query list - SStreamList * pSList; // stream list } SUserObj; typedef struct { diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index 0311983a3812b68bfe8253fcc94733f65c124119..30745db035c675004ae8b449c282c7fc26a2e172 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -21,9 +21,30 @@ extern "C" { #endif #include "mnodeDef.h" +typedef struct { + char user[TSDB_USER_LEN + 1]; + int8_t killed; + uint16_t port; + uint32_t ip; + uint32_t connId; + uint64_t stime; + uint64_t lastAccess; + uint32_t queryId; + uint32_t streamId; + int32_t numOfQueries; + int32_t numOfStreams; + SStreamDesc *pStreams; + SQueryDesc * pQueries; +} SConnObj; + int32_t mnodeInitProfile(); void mnodeCleanupProfile(); +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); +SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port); +void mnodeReleaseConn(SConnObj *pConn); +int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mnodeShow.h b/src/mnode/inc/mnodeShow.h index d571eabfd8a6853ef7311d358600994cd701d9e1..da66e7167822d0077a8b5cad99489868eab7485d 100644 --- a/src/mnode/inc/mnodeShow.h +++ b/src/mnode/inc/mnodeShow.h @@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index baecfac0775983ca34706a838a1667544c1bf179..298d10993beee3262a5dbebc6184a8f5a40d8b73 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -33,14 +33,54 @@ #include "mnodeUser.h" #include "mnodeTable.h" #include "mnodeShow.h" +#include "mnodeProfile.h" + +typedef struct { + const char *const name; + int (*init)(); + void (*cleanup)(); +} SMnodeComponent; void *tsMnodeTmr; static bool tsMgmtIsRunning = false; +static const SMnodeComponent tsMnodeComponents[] = { + {"profile", mnodeInitProfile, mnodeCleanupProfile}, + {"accts", mnodeInitAccts, mnodeCleanupAccts}, + {"users", mnodeInitUsers, mnodeCleanupUsers}, + {"dnodes", mnodeInitDnodes, mnodeCleanupDnodes}, + {"dbs", mnodeInitDbs, mnodeCleanupDbs}, + {"vgroups", mnodeInitVgroups, mnodeCleanupVgroups}, + {"tables", mnodeInitTables, mnodeCleanupTables}, + {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes}, + {"sdb", sdbInit, sdbCleanUp}, + {"balance", balanceInit, balanceCleanUp}, + {"grant", grantInit, grantCleanUp}, + {"show", mnodeInitShow, mnodeCleanUpShow} +}; + static void mnodeInitTimer(); static void mnodeCleanupTimer(); static bool mnodeNeedStart() ; +static void mnodeCleanupComponents(int32_t stepId) { + for (int32_t i = stepId; i >= 0; i--) { + tsMnodeComponents[i].cleanup(); + } +} + +static int32_t mnodeInitComponents() { + int32_t code = 0; + for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) { + if (tsMnodeComponents[i].init() != 0) { + mnodeCleanupComponents(i); + code = -1; + break; + } + } + return code; +} + int32_t mnodeStartSystem() { if (tsMgmtIsRunning) { mPrint("mnode module already started..."); @@ -57,57 +97,7 @@ int32_t mnodeStartSystem() { dnodeAllocateMnodeRqueue(); dnodeAllocateMnodePqueue(); - if (mnodeInitAccts() < 0) { - mError("failed to init accts"); - return -1; - } - - if (mnodeInitUsers() < 0) { - mError("failed to init users"); - return -1; - } - - if (mnodeInitDnodes() < 0) { - mError("failed to init dnodes"); - return -1; - } - - if (mnodeInitDbs() < 0) { - mError("failed to init dbs"); - return -1; - } - - if (mnodeInitVgroups() < 0) { - mError("failed to init vgroups"); - return -1; - } - - if (mnodeInitTables() < 0) { - mError("failed to init tables"); - return -1; - } - - if (mnodeInitMnodes() < 0) { - mError("failed to init mnodes"); - return -1; - } - - if (sdbInit() < 0) { - mError("failed to init sdb"); - return -1; - } - - if (balanceInit() < 0) { - mError("failed to init balance") - } - - if (grantInit() < 0) { - mError("failed to init grant"); - return -1; - } - - if (mnodeInitShow() < 0) { - mError("failed to init show"); + if (mnodeInitComponents() != 0) { return -1; } @@ -115,7 +105,6 @@ int32_t mnodeStartSystem() { tsMgmtIsRunning = true; mPrint("mnode is initialized successfully"); - return 0; } @@ -133,17 +122,8 @@ void mnodeCleanupSystem() { dnodeFreeMnodeRqueue(); dnodeFreeMnodePqueue(); mnodeCleanupTimer(); - mnodeCleanUpShow(); - grantCleanUp(); - balanceCleanUp(); - sdbCleanUp(); - mnodeCleanupMnodes(); - mnodeCleanupTables(); - mnodeCleanupVgroups(); - mnodeCleanupDbs(); - mnodeCleanupDnodes(); - mnodeCleanupUsers(); - mnodeCleanupAccts(); + mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); + mPrint("mnode is cleaned up"); } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index a37f5436c6ee70722dd131373f5ffbe3b0298564..49f79a54f1092540433faa9a766f3ff944a5ed29 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -18,6 +18,10 @@ #include "taosmsg.h" #include "taoserror.h" #include "tutil.h" +#include "ttime.h" +#include "tcache.h" +#include "tglobal.h" +#include "tdataformat.h" #include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" @@ -32,153 +36,170 @@ #include "mnodeVgroup.h" #include "mnodeWrite.h" -int32_t mnodeSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); - -int32_t mnodeKillQuery(char *qidstr, void *pConn); -int32_t mnodeKillStream(char *qidstr, void *pConn); -int32_t mnodeKillConnection(char *qidstr, void *pConn); - -typedef struct { - char user[TSDB_TABLE_ID_LEN + 1]; - uint64_t stime; - uint32_t ip; - uint16_t port; -} SConnInfo; - -typedef struct { - int numOfConns; - int index; - SConnInfo connInfo[]; -} SConnShow; - -typedef struct { - uint32_t ip; - uint16_t port; - char user[TSDB_TABLE_ID_LEN+ 1]; -} SCDesc; - -typedef struct { - int32_t index; - int32_t numOfQueries; - SCDesc * connInfo; - SCDesc **cdesc; - SQueryDesc qdesc[]; -} SQueryShow; - -typedef struct { - int32_t index; - int32_t numOfStreams; - SCDesc * connInfo; - SCDesc **cdesc; - SStreamDesc sdesc[]; -} SStreamShow; - -int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { -// SAcctObj *pAcct = pConn->pAcct; -// -// if (contLen <= 0 || pAcct == NULL) { -// return 0; -// } -// -// pthread_mutex_lock(&pAcct->mutex); -// -// if (pConn->pQList) { -// pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries; -// pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams; -// } -// -// pConn->pQList = realloc(pConn->pQList, contLen); -// memcpy(pConn->pQList, cont, contLen); -// -// pConn->pSList = (SStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQueryDesc) + sizeof(SQqueryList)); -// -// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries; -// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams; -// -// pthread_mutex_unlock(&pAcct->mutex); +#define CONN_KEEP_TIME (tsShellActivityTimer * 3) +#define CONN_CHECK_TIME (tsShellActivityTimer * 2) +#define QUERY_ID_SIZE 20 + +extern void *tsMnodeTmr; +static SCacheObj *tsMnodeConnCache = NULL; +static uint32_t tsConnIndex = 0; + +static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mnodeFreeConn(void *data); +static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg); - return TSDB_CODE_SUCCESS; +int32_t mnodeInitProfile() { + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); + + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); + + tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn); + return 0; } -int32_t mnodeGetQueries(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SQueryShow *pQueryShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pQueryShow = malloc(sizeof(SQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); -// pQueryShow->numOfQueries = 0; -// pQueryShow->index = 0; -// pQueryShow->connInfo = NULL; -// pQueryShow->cdesc = NULL; -// -// if (pAcct->acctInfo.numOfQueries > 0) { -// pQueryShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc)); -// pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *)); -// -// pConn = pAcct->pConn; -// SQueryDesc * pQdesc = pQueryShow->qdesc; -// SCDesc * pCDesc = pQueryShow->connInfo; -// SCDesc **ppCDesc = pQueryShow->cdesc; -// -// while (pConn) { -// if (pConn->pQList && pConn->pQList->numOfQueries > 0) { -// pCDesc->ip = pConn->ip; -// pCDesc->port = pConn->port; -// strcpy(pCDesc->user, pConn->pUser->user); -// -// memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQueryDesc) * pConn->pQList->numOfQueries); -// pQdesc += pConn->pQList->numOfQueries; -// pQueryShow->numOfQueries += pConn->pQList->numOfQueries; -// for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; -// -// pCDesc++; -// } -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pIter = pQueryShow; +void mnodeCleanupProfile() { + if (tsMnodeConnCache != NULL) { + mPrint("conn cache is cleanup"); + taosCacheCleanup(tsMnodeConnCache); + tsMnodeConnCache = NULL; + } +} - return 0; +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { + int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); + if (connSize > tsMaxShellConns) { + mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip), + port, connSize, tsMaxShellConns); + terrno = TSDB_CODE_TOO_MANY_SHELL_CONNS; + return NULL; + } + + uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1); + if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1); + + SConnObj connObj = { + .ip = ip, + .port = port, + .connId = connId, + .stime = taosGetTimestampMs() + }; + strcpy(connObj.user, user); + + char key[10]; + sprintf(key, "%u", connId); + SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME); + + mTrace("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); + return pConn; } -int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - int32_t cols = 0; +void mnodeReleaseConn(SConnObj *pConn) { + if(pConn == NULL) return; + taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); +} + +SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { + char key[10]; + sprintf(key, "%u", connId); + uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); + + SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime); + if (pConn == NULL) { + mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); + return NULL; + } + + if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */) { + mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user, + taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port); + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); + return NULL; + } + + // mTrace("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); + pConn->lastAccess = expireTime; + return pConn; +} + +static void mnodeFreeConn(void *data) { + SConnObj *pConn = data; + tfree(pConn->pQueries); + tfree(pConn->pQueries); + + mTrace("connId:%d, is destroyed", pConn->connId); +} +static void *mnodeGetNextConn(SHashMutableIterator *pIter, SConnObj **pConn) { + *pConn = NULL; + + if (pIter == NULL) { + pIter = taosHashCreateIter(tsMnodeConnCache->pHashTable); + } + + if (!taosHashIterNext(pIter)) { + taosHashDestroyIter(pIter); + return NULL; + } + + SCacheDataNode **pNode = taosHashIterGet(pIter); + if (pNode == NULL || *pNode == NULL) { + taosHashDestroyIter(pIter); + return NULL; + } + + *pConn = (SConnObj*)((*pNode)->data); + return pIter; +} + +static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + int32_t cols = 0; SSchema *pSchema = pMeta->schema; - pShow->bytes[cols] = TSDB_USER_LEN; + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "connId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 14; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port:id"); + strcpy(pSchema[cols].name, "ip:port"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created_time"); + strcpy(pSchema[cols].name, "login time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; - strcpy(pSchema[cols].name, "time(us)"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_SHOW_SQL_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "sql"); + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "last access"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -186,314 +207,208 @@ int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } - pShow->numOfRows = 1000000; - pShow->pIter = NULL; + pShow->numOfRows = taosHashGetSize(tsMnodeConnCache->pHashTable); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - mnodeGetQueries(pShow, pConn); return 0; } -int32_t mnodeKillQuery(char *qidstr, void *pConn) { -// char *temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint16_t port = htons(atoi(temp)); -// -// temp = chr + 1; -// uint32_t queryId = atoi(temp); -// -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port && pConn->pQList) { -// int32_t i; -// SQueryDesc *pQDesc = pConn->pQList->qdesc; -// for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) { -// if (pQDesc->queryId == queryId) break; -// } -// -// if (i < pConn->pQList->numOfQueries) break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn) pConn->queryId = queryId; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn == NULL || pConn->pQList == NULL || pConn->pQList->numOfQueries == 0) goto _error; -// -// mTrace("query:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("query:%s is not there", qidstr); - - return TSDB_CODE_INVALID_QUERY_ID; -} - -int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; - char *pWrite; + SConnObj *pConnObj = NULL; int32_t cols = 0; - - SQueryShow *pQueryShow = (SQueryShow *)pShow->pIter; - - if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; while (numOfRows < rows) { - SQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index; - SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index]; + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pCDesc->user); + *(int32_t *) pWrite = pConnObj->connId; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pCDesc->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu:%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pCDesc->port), - pNode->queryId); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->useconds; + *(int64_t *)pWrite = pConnObj->stime; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->sql); + *(int64_t *)pWrite = pConnObj->lastAccess; cols++; numOfRows++; - pQueryShow->index++; - } - - if (numOfRows == 0) { - tfree(pQueryShow->cdesc); - tfree(pQueryShow->connInfo); - tfree(pQueryShow); } pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 5; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + return numOfRows; } -int32_t mnodeGetStreams(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SStreamShow *pStreamShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pStreamShow = malloc(sizeof(SStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); -// pStreamShow->numOfStreams = 0; -// pStreamShow->index = 0; -// pStreamShow->connInfo = NULL; -// pStreamShow->cdesc = NULL; -// -// if (pAcct->acctInfo.numOfStreams > 0) { -// pStreamShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc)); -// pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *)); -// -// pConn = pAcct->pConn; -// SStreamDesc * pSdesc = pStreamShow->sdesc; -// SCDesc * pCDesc = pStreamShow->connInfo; -// SCDesc **ppCDesc = pStreamShow->cdesc; -// -// while (pConn) { -// if (pConn->pSList && pConn->pSList->numOfStreams > 0) { -// pCDesc->ip = pConn->ip; -// pCDesc->port = pConn->port; -// strcpy(pCDesc->user, pConn->pUser->user); -// -// memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SStreamDesc) * pConn->pSList->numOfStreams); -// pSdesc += pConn->pSList->numOfStreams; -// pStreamShow->numOfStreams += pConn->pSList->numOfStreams; -// for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; -// -// pCDesc++; -// } -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pIter = pStreamShow; +// not thread safe, need optimized +int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) { + pConn->numOfQueries = htonl(pHBMsg->numOfQueries); + if (pConn->numOfQueries > 0) { + pConn->pQueries = calloc(sizeof(SQueryDesc), pConn->numOfQueries); + memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc)); + } - return 0; + pConn->numOfStreams = htonl(pHBMsg->numOfStreams); + if (pConn->numOfStreams > 0) { + pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams); + memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), + pConn->numOfStreams * sizeof(SStreamDesc)); + } + + return TSDB_CODE_SUCCESS; } +static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; -int32_t mnodeKillStream(char *qidstr, void *pConn) { -// char *temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint16_t port = htons(atoi(temp)); -// -// temp = chr + 1; -// uint32_t streamId = atoi(temp); -// -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port && pConn->pSList) { -// int32_t i; -// SStreamDesc *pSDesc = pConn->pSList->sdesc; -// for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) { -// if (pSDesc->streamId == streamId) break; -// } -// -// if (i < pConn->pSList->numOfStreams) break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn) pConn->streamId = streamId; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn == NULL || pConn->pSList == NULL || pConn->pSList->numOfStreams == 0) goto _error; -// -// mTrace("stream:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("stream:%s is not there", qidstr); - - return TSDB_CODE_INVALID_STREAM_ID; -} + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; -int32_t mnodeKillConnection(char *qidstr, void *pConn) { -// void *pConn1 = NULL; -// char * temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// uint16_t port = htons(atoi(temp)); -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port) { -// // there maybe two connections from a shell -// if (pConn1 == NULL) -// pConn1 = pConn; -// else -// break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn1) pConn1->killConnection = 1; -// if (pConn) pConn->killConnection = 1; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn1 == NULL) goto _error; -// -// mTrace("connection:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("connection:%s is not there", qidstr); - - return TSDB_CODE_INVALID_CONNECTION; -} + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "queryId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time(us)"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } -int mnodeGetConns(SShowObj *pShow, void *pConn) { - // SAcctObj * pAcct = pConn->pAcct; - // SConnShow *pConnShow; - // - // pthread_mutex_lock(&pAcct->mutex); - // - // pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); - // pConnShow->index = 0; - // pConnShow->numOfConns = 0; - // - // if (pAcct->acctInfo.numOfConns > 0) { - // pConn = pAcct->pConn; - // SConnInfo *pConnInfo = pConnShow->connInfo; - // - // while (pConn && pConn->pUser) { - // strcpy(pConnInfo->user, pConn->pUser->user); - // pConnInfo->ip = pConn->ip; - // pConnInfo->port = pConn->port; - // pConnInfo->stime = pConn->stime; - // - // pConnShow->numOfConns++; - // pConnInfo++; - // pConn = pConn->next; - // } - // } - // - // pthread_mutex_unlock(&pAcct->mutex); - // - // // sorting based on useconds - // - // pShow->pIter = pConnShow; + pShow->numOfRows = 1000000; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; return 0; } -int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - int32_t cols = 0; +static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + + for (int32_t i = 0; i < pConnObj->numOfQueries; ++i) { + SQueryDesc *pDesc = pConnObj->pQueries + i; + cols = 0; + + snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, QUERY_ID_SIZE); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, TSDB_SHOW_SQL_LEN); + cols++; - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + numOfRows++; + } + } + + pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 6; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + return numOfRows; +} + +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + int32_t cols = 0; SSchema *pSchema = pMeta->schema; + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "streamId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); pSchema[cols].bytes = htons(pShow->bytes[cols]); @@ -501,7 +416,31 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "exec time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time(us)"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "cycles"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -509,140 +448,148 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } pShow->numOfRows = 1000000; - pShow->pIter = NULL; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - mnodeGetConns(pShow, pConn); return 0; } -int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; - char *pWrite; + SConnObj *pConnObj = NULL; int32_t cols = 0; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; - SConnShow *pConnShow = (SConnShow *)pShow->pIter; + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; - if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; + for (int32_t i = 0; i < pConnObj->numOfStreams; ++i) { + SStreamDesc *pDesc = pConnObj->pStreams + i; + cols = 0; - while (numOfRows < rows) { - SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; - cols = 0; + snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->streamId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, QUERY_ID_SIZE); + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->user); - cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pNode->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); - cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); + cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->ctime); + cols++; - numOfRows++; - pConnShow->index++; - } + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; - if (numOfRows == 0) { - tfree(pConnShow); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, TSDB_SHOW_SQL_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = (int32_t)htobe64(pDesc->num); + cols++; + + numOfRows++; + } } pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 8; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); return numOfRows; } -int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } - - // SCMKillQueryMsg *pKill = pMsg->pCont; - // int32_t code; - - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillQuery(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; -} +static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; -int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } - - // SCMKillStreamMsg *pKill = pMsg->pCont; - // int32_t code; - - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillStream(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; -} + SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + mPrint("kill query msg is received, queryId:%s", pKill->queryId); -int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } - - // SCMKillConnMsg *pKill = pMsg->pCont; - // int32_t code; - - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillConnection(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; + const char delim = ':'; + char* connIdStr = strtok(pKill->queryId, &delim); + char* queryIdStr = strtok(NULL, &delim); + + if (queryIdStr == NULL || connIdStr == NULL) { + mPrint("failed to kill query, queryId:%s", pKill->queryId); + return TSDB_CODE_INVALID_QUERY_ID; + } + + int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); + + SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + if (pConn == NULL) { + mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mPrint("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pUser->user); + pConn->queryId = queryId; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } } -int32_t mnodeInitProfile() { - // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); - // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); - // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); - // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); +static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); + SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + mPrint("kill stream msg is received, streamId:%s", pKill->queryId); - return 0; + const char delim = ':'; + char* connIdStr = strtok(pKill->queryId, &delim); + char* streamIdStr = strtok(NULL, &delim); + + if (streamIdStr == NULL || connIdStr == NULL) { + mPrint("failed to kill stream, streamId:%s", pKill->queryId); + return TSDB_CODE_INVALID_STREAM_ID; + } + + int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); + + SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + if (pConn == NULL) { + mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mPrint("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pUser->user); + pConn->streamId = streamId; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } } -void mnodeCleanupProfile() {} +static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; + SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId); + if (pConn == NULL) { + mError("connId:%s, failed to kill, conn not exist", pKill->queryId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mPrint("connId:%s, is killed by user:%s", pKill->queryId, pUser->user); + pConn->killed = 1; + taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + return TSDB_CODE_SUCCESS; + } +} diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index dd6de9d351d9864e800fda6b917c297ef9c803ff..c1edd309c824337e08f4f950375f5f7b299eb6ca 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -227,21 +227,47 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } + SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; + SRpcConnInfo connInfo; + rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); + + int32_t connId = htonl(pHBMsg->connId); + SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); + if (pConn == NULL) { + pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + } + + if (pConn == NULL) { + // do not close existing links, otherwise + // mError("failed to create connId, close connect"); + // pHBRsp->killConnection = 1; + } else { + pHBRsp->connId = htonl(pConn->connId); + mnodeSaveQueryStreamList(pConn, pHBMsg); + + if (pConn->killed != 0) { + pHBRsp->killConnection = 1; + } + + if (pConn->streamId != 0) { + pHBRsp->streamId = htonl(pConn->streamId); + pConn->streamId = 0; + } + + if (pConn->queryId != 0) { + pHBRsp->queryId = htonl(pConn->queryId); + pConn->queryId = 0; + } + } + pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); - - /* - * TODO - * Dispose kill stream or kill query message - */ - pHBRsp->queryId = 0; - pHBRsp->streamId = 0; - pHBRsp->killConnection = 0; pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); + mnodeReleaseConn(pConn); return TSDB_CODE_SUCCESS; } @@ -281,11 +307,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { goto connect_over; } + SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + if (pConn == NULL) { + code = terrno; + } else { + pConnectRsp->connId = htonl(pConn->connId); + mnodeReleaseConn(pConn); + } + sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); strcpy(pConnectRsp->serverVersion, version); pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - + mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); connect_over: @@ -358,3 +392,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); } + +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { + if (rows < capacity) { + for (int32_t i = 0; i < numOfCols; ++i) { + memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); + } + } +} \ No newline at end of file diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 196056b47471b3213a417595017c5ad5405e6373..56592aff4b61af17bd8a6261724da619c5fe3f7b 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); @@ -568,8 +568,8 @@ int32_t mnodeInitTables() { mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables); - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeGetStreamTableMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeRetrieveStreamTables); return TSDB_CODE_SUCCESS; } @@ -2111,14 +2111,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void return 0; } -static void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { - if (rows < capacity) { - for (int32_t i = 0; i < numOfCols; ++i) { - memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); - } - } -} - static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return 0; @@ -2262,7 +2254,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { return code; } -static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; @@ -2308,7 +2300,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p return 0; } -static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return 0; diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index 0c27233289ce054625e3512374748206f54176df..5275dd9d6c2552bbe4994b29da1c28583723be66 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.slowquery(ts timestamp, username " "binary(%d), created_time timestamp, time bigint, sql binary(%d))", - tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN); + tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SLOW_QUERY_SQL_LEN); } else if (cmd == MONITOR_CMD_CREATE_TB_LOG) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.log(ts timestamp, level tinyint, " diff --git a/src/query/src/sql.c b/src/query/src/sql.c index d079e5a24ed7be23a3c2163d0557be49b9f9f7cc..545cef408216e597b1f48e6e6da0968baaf7983c 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -126,17 +126,17 @@ typedef union { #define ParseARG_FETCH SSqlInfo* pInfo = yypParser->pInfo #define ParseARG_STORE yypParser->pInfo = pInfo #define YYFALLBACK 1 -#define YYNSTATE 247 +#define YYNSTATE 241 #define YYNRULE 220 #define YYNTOKEN 205 -#define YY_MAX_SHIFT 246 -#define YY_MIN_SHIFTREDUCE 403 -#define YY_MAX_SHIFTREDUCE 622 -#define YY_ERROR_ACTION 623 -#define YY_ACCEPT_ACTION 624 -#define YY_NO_ACTION 625 -#define YY_MIN_REDUCE 626 -#define YY_MAX_REDUCE 845 +#define YY_MAX_SHIFT 240 +#define YY_MIN_SHIFTREDUCE 397 +#define YY_MAX_SHIFTREDUCE 616 +#define YY_ERROR_ACTION 617 +#define YY_ACCEPT_ACTION 618 +#define YY_NO_ACTION 619 +#define YY_MIN_REDUCE 620 +#define YY_MAX_REDUCE 839 /************* End control #defines *******************************************/ /* Define the yytestcase() macro to be a no-op if is not already defined @@ -202,63 +202,63 @@ typedef union { ** yy_default[] Default action for each state. ** *********** Begin parsing tables **********************************************/ -#define YY_ACTTAB_COUNT (547) +#define YY_ACTTAB_COUNT (541) static const YYACTIONTYPE yy_action[] = { - /* 0 */ 724, 444, 723, 11, 722, 134, 624, 246, 725, 445, - /* 10 */ 727, 726, 764, 41, 43, 21, 35, 36, 153, 244, - /* 20 */ 135, 29, 135, 444, 203, 39, 37, 40, 38, 158, - /* 30 */ 833, 445, 832, 34, 33, 139, 135, 32, 31, 30, - /* 40 */ 41, 43, 753, 35, 36, 157, 833, 166, 29, 739, - /* 50 */ 103, 203, 39, 37, 40, 38, 188, 21, 103, 99, - /* 60 */ 34, 33, 761, 155, 32, 31, 30, 404, 405, 406, - /* 70 */ 407, 408, 409, 410, 411, 412, 413, 414, 415, 245, - /* 80 */ 444, 742, 41, 43, 103, 35, 36, 103, 445, 168, - /* 90 */ 29, 738, 21, 203, 39, 37, 40, 38, 32, 31, - /* 100 */ 30, 56, 34, 33, 753, 787, 32, 31, 30, 43, - /* 110 */ 191, 35, 36, 788, 829, 198, 29, 21, 154, 203, - /* 120 */ 39, 37, 40, 38, 167, 578, 739, 8, 34, 33, - /* 130 */ 61, 113, 32, 31, 30, 665, 35, 36, 126, 59, - /* 140 */ 200, 29, 58, 17, 203, 39, 37, 40, 38, 221, - /* 150 */ 26, 739, 169, 34, 33, 220, 219, 32, 31, 30, - /* 160 */ 16, 239, 214, 238, 213, 212, 211, 237, 210, 236, - /* 170 */ 235, 209, 720, 828, 709, 710, 711, 712, 713, 714, - /* 180 */ 715, 716, 717, 718, 719, 162, 591, 234, 76, 582, - /* 190 */ 165, 585, 240, 588, 234, 162, 591, 98, 827, 582, - /* 200 */ 225, 585, 60, 588, 26, 162, 591, 12, 742, 582, - /* 210 */ 742, 585, 674, 588, 27, 126, 21, 159, 160, 34, - /* 220 */ 33, 202, 842, 32, 31, 30, 148, 159, 160, 740, - /* 230 */ 536, 539, 88, 87, 142, 18, 666, 159, 160, 126, - /* 240 */ 147, 559, 560, 39, 37, 40, 38, 50, 226, 550, - /* 250 */ 739, 34, 33, 46, 507, 32, 31, 30, 523, 531, - /* 260 */ 17, 520, 151, 521, 51, 522, 190, 26, 16, 239, - /* 270 */ 152, 238, 243, 242, 95, 237, 551, 236, 235, 177, - /* 280 */ 14, 42, 223, 222, 580, 741, 185, 187, 182, 170, - /* 290 */ 171, 42, 590, 584, 150, 587, 74, 78, 83, 86, - /* 300 */ 77, 42, 590, 161, 608, 592, 80, 589, 13, 13, - /* 310 */ 140, 583, 590, 586, 513, 47, 141, 589, 46, 798, - /* 320 */ 581, 116, 117, 68, 64, 67, 143, 589, 130, 128, - /* 330 */ 91, 90, 89, 512, 48, 207, 527, 22, 528, 22, - /* 340 */ 144, 3, 73, 72, 10, 9, 145, 525, 146, 526, - /* 350 */ 85, 84, 137, 797, 133, 138, 136, 163, 794, 524, - /* 360 */ 793, 164, 763, 733, 224, 100, 755, 780, 779, 114, - /* 370 */ 26, 115, 112, 676, 208, 131, 24, 217, 673, 218, - /* 380 */ 841, 70, 840, 838, 118, 694, 25, 93, 23, 132, - /* 390 */ 663, 79, 189, 546, 661, 192, 81, 82, 659, 658, - /* 400 */ 172, 127, 656, 196, 655, 654, 653, 652, 644, 129, - /* 410 */ 650, 648, 646, 52, 752, 767, 49, 44, 768, 781, - /* 420 */ 201, 199, 197, 195, 193, 28, 216, 75, 227, 228, - /* 430 */ 229, 230, 205, 232, 231, 53, 233, 241, 622, 149, - /* 440 */ 173, 62, 65, 174, 176, 175, 621, 178, 179, 180, - /* 450 */ 181, 657, 121, 120, 695, 125, 119, 122, 123, 92, - /* 460 */ 124, 651, 1, 106, 104, 737, 94, 105, 620, 109, - /* 470 */ 107, 108, 110, 111, 2, 184, 613, 183, 186, 190, - /* 480 */ 533, 55, 547, 156, 101, 57, 552, 194, 102, 5, - /* 490 */ 6, 63, 484, 593, 4, 19, 20, 15, 204, 7, - /* 500 */ 206, 481, 479, 478, 477, 475, 448, 215, 66, 45, - /* 510 */ 22, 509, 508, 69, 506, 54, 469, 467, 459, 465, - /* 520 */ 461, 463, 457, 455, 71, 483, 482, 480, 476, 474, - /* 530 */ 46, 446, 419, 417, 626, 625, 625, 625, 625, 625, - /* 540 */ 96, 625, 625, 625, 625, 625, 97, + /* 0 */ 718, 438, 717, 11, 716, 134, 618, 240, 719, 439, + /* 10 */ 721, 720, 758, 41, 43, 21, 35, 36, 153, 238, + /* 20 */ 135, 29, 135, 438, 197, 39, 37, 40, 38, 158, + /* 30 */ 827, 439, 826, 34, 33, 139, 135, 32, 31, 30, + /* 40 */ 41, 43, 747, 35, 36, 157, 827, 166, 29, 733, + /* 50 */ 103, 197, 39, 37, 40, 38, 182, 21, 103, 99, + /* 60 */ 34, 33, 755, 155, 32, 31, 30, 398, 399, 400, + /* 70 */ 401, 402, 403, 404, 405, 406, 407, 408, 409, 239, + /* 80 */ 438, 736, 41, 43, 103, 35, 36, 103, 439, 168, + /* 90 */ 29, 732, 21, 197, 39, 37, 40, 38, 32, 31, + /* 100 */ 30, 56, 34, 33, 747, 781, 32, 31, 30, 43, + /* 110 */ 185, 35, 36, 782, 823, 192, 29, 21, 154, 197, + /* 120 */ 39, 37, 40, 38, 167, 572, 733, 8, 34, 33, + /* 130 */ 61, 113, 32, 31, 30, 659, 35, 36, 126, 59, + /* 140 */ 194, 29, 58, 17, 197, 39, 37, 40, 38, 215, + /* 150 */ 26, 733, 169, 34, 33, 214, 213, 32, 31, 30, + /* 160 */ 16, 233, 208, 232, 207, 206, 205, 231, 204, 230, + /* 170 */ 229, 203, 714, 219, 703, 704, 705, 706, 707, 708, + /* 180 */ 709, 710, 711, 712, 713, 162, 585, 50, 60, 576, + /* 190 */ 175, 579, 165, 582, 234, 162, 585, 179, 178, 576, + /* 200 */ 27, 579, 734, 582, 51, 162, 585, 12, 98, 576, + /* 210 */ 736, 579, 736, 582, 228, 26, 21, 159, 160, 34, + /* 220 */ 33, 196, 836, 32, 31, 30, 148, 159, 160, 76, + /* 230 */ 822, 533, 88, 87, 142, 228, 668, 159, 160, 126, + /* 240 */ 147, 553, 554, 39, 37, 40, 38, 821, 220, 544, + /* 250 */ 733, 34, 33, 46, 501, 32, 31, 30, 517, 525, + /* 260 */ 17, 514, 151, 515, 152, 516, 184, 26, 16, 233, + /* 270 */ 140, 232, 237, 236, 95, 231, 660, 230, 229, 126, + /* 280 */ 530, 42, 217, 216, 578, 18, 581, 181, 161, 170, + /* 290 */ 171, 42, 584, 577, 150, 580, 74, 78, 83, 86, + /* 300 */ 77, 42, 584, 574, 545, 602, 80, 583, 14, 13, + /* 310 */ 141, 586, 584, 143, 507, 13, 47, 583, 46, 73, + /* 320 */ 72, 116, 117, 68, 64, 67, 3, 583, 130, 128, + /* 330 */ 91, 90, 89, 506, 201, 48, 144, 22, 22, 575, + /* 340 */ 521, 519, 522, 520, 10, 9, 85, 84, 145, 146, + /* 350 */ 137, 133, 138, 735, 136, 792, 791, 163, 788, 518, + /* 360 */ 787, 164, 757, 727, 218, 100, 749, 774, 773, 114, + /* 370 */ 26, 115, 112, 670, 202, 131, 24, 211, 667, 212, + /* 380 */ 835, 70, 834, 832, 118, 688, 25, 183, 23, 132, + /* 390 */ 657, 79, 93, 540, 655, 186, 81, 82, 653, 652, + /* 400 */ 172, 190, 127, 746, 650, 649, 648, 647, 646, 638, + /* 410 */ 129, 644, 642, 52, 640, 44, 49, 195, 761, 762, + /* 420 */ 775, 191, 193, 189, 187, 28, 210, 75, 221, 222, + /* 430 */ 223, 224, 225, 199, 226, 227, 235, 53, 616, 174, + /* 440 */ 615, 149, 62, 173, 65, 176, 177, 614, 180, 651, + /* 450 */ 607, 184, 92, 527, 645, 541, 120, 689, 121, 122, + /* 460 */ 119, 123, 125, 124, 94, 104, 1, 731, 105, 111, + /* 470 */ 108, 106, 107, 109, 110, 2, 55, 57, 101, 156, + /* 480 */ 188, 5, 546, 102, 19, 6, 587, 20, 4, 15, + /* 490 */ 63, 7, 198, 478, 200, 475, 473, 472, 471, 469, + /* 500 */ 442, 209, 66, 45, 69, 71, 22, 503, 502, 500, + /* 510 */ 54, 463, 461, 453, 459, 455, 457, 451, 449, 477, + /* 520 */ 476, 474, 470, 468, 46, 440, 96, 413, 411, 620, + /* 530 */ 619, 619, 619, 619, 619, 619, 619, 619, 619, 619, + /* 540 */ 97, }; static const YYCODETYPE yy_lookahead[] = { /* 0 */ 225, 1, 227, 258, 229, 258, 206, 207, 233, 9, @@ -278,44 +278,44 @@ static const YYCODETYPE yy_lookahead[] = { /* 140 */ 262, 21, 264, 97, 24, 25, 26, 27, 28, 241, /* 150 */ 104, 243, 126, 33, 34, 129, 130, 37, 38, 39, /* 160 */ 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, - /* 170 */ 95, 96, 225, 258, 227, 228, 229, 230, 231, 232, - /* 180 */ 233, 234, 235, 236, 237, 1, 2, 78, 72, 5, - /* 190 */ 226, 7, 226, 9, 78, 1, 2, 97, 258, 5, - /* 200 */ 209, 7, 245, 9, 104, 1, 2, 44, 244, 5, - /* 210 */ 244, 7, 213, 9, 257, 216, 209, 33, 34, 33, - /* 220 */ 34, 37, 244, 37, 38, 39, 63, 33, 34, 238, - /* 230 */ 102, 37, 69, 70, 71, 107, 213, 33, 34, 216, - /* 240 */ 77, 114, 115, 25, 26, 27, 28, 102, 241, 98, + /* 170 */ 95, 96, 225, 209, 227, 228, 229, 230, 231, 232, + /* 180 */ 233, 234, 235, 236, 237, 1, 2, 102, 245, 5, + /* 190 */ 125, 7, 226, 9, 226, 1, 2, 132, 133, 5, + /* 200 */ 257, 7, 238, 9, 119, 1, 2, 44, 97, 5, + /* 210 */ 244, 7, 244, 9, 78, 104, 209, 33, 34, 33, + /* 220 */ 34, 37, 244, 37, 38, 39, 63, 33, 34, 72, + /* 230 */ 258, 37, 69, 70, 71, 78, 213, 33, 34, 216, + /* 240 */ 77, 114, 115, 25, 26, 27, 28, 258, 241, 98, /* 250 */ 243, 33, 34, 102, 5, 37, 38, 39, 2, 98, - /* 260 */ 97, 5, 258, 7, 119, 9, 105, 104, 85, 86, - /* 270 */ 258, 88, 60, 61, 62, 92, 98, 94, 95, 125, - /* 280 */ 102, 97, 33, 34, 1, 244, 132, 124, 134, 33, + /* 260 */ 97, 5, 258, 7, 258, 9, 105, 104, 85, 86, + /* 270 */ 258, 88, 60, 61, 62, 92, 213, 94, 95, 216, + /* 280 */ 102, 97, 33, 34, 5, 107, 7, 124, 59, 33, /* 290 */ 34, 97, 108, 5, 131, 7, 64, 65, 66, 67, - /* 300 */ 68, 97, 108, 59, 98, 98, 74, 123, 102, 102, - /* 310 */ 258, 5, 108, 7, 98, 102, 258, 123, 102, 239, - /* 320 */ 37, 64, 65, 66, 67, 68, 258, 123, 64, 65, - /* 330 */ 66, 67, 68, 98, 121, 98, 5, 102, 7, 102, - /* 340 */ 258, 97, 127, 128, 127, 128, 258, 5, 258, 7, - /* 350 */ 72, 73, 258, 239, 258, 258, 258, 239, 239, 103, + /* 300 */ 68, 97, 108, 1, 98, 98, 74, 123, 102, 102, + /* 310 */ 258, 98, 108, 258, 98, 102, 102, 123, 102, 127, + /* 320 */ 128, 64, 65, 66, 67, 68, 97, 123, 64, 65, + /* 330 */ 66, 67, 68, 98, 98, 121, 258, 102, 102, 37, + /* 340 */ 5, 5, 7, 7, 127, 128, 72, 73, 258, 258, + /* 350 */ 258, 258, 258, 244, 258, 239, 239, 239, 239, 103, /* 360 */ 239, 239, 209, 240, 239, 209, 242, 265, 265, 209, /* 370 */ 104, 209, 246, 209, 209, 209, 209, 209, 209, 209, - /* 380 */ 209, 209, 209, 209, 209, 209, 209, 59, 209, 209, - /* 390 */ 209, 209, 242, 108, 209, 261, 209, 209, 209, 209, - /* 400 */ 209, 209, 209, 261, 209, 209, 209, 209, 209, 209, - /* 410 */ 209, 209, 209, 118, 255, 210, 120, 117, 210, 210, - /* 420 */ 112, 116, 111, 110, 109, 122, 75, 84, 83, 49, - /* 430 */ 80, 82, 210, 81, 53, 210, 79, 75, 5, 210, - /* 440 */ 133, 214, 214, 5, 58, 133, 5, 133, 5, 133, - /* 450 */ 58, 210, 218, 222, 224, 217, 223, 221, 219, 211, - /* 460 */ 220, 210, 215, 252, 254, 242, 211, 253, 5, 249, - /* 470 */ 251, 250, 248, 247, 212, 58, 87, 133, 125, 105, - /* 480 */ 98, 106, 98, 1, 97, 102, 98, 97, 97, 113, - /* 490 */ 113, 72, 9, 98, 97, 102, 102, 97, 99, 97, - /* 500 */ 99, 5, 5, 5, 5, 5, 76, 15, 72, 16, - /* 510 */ 102, 5, 5, 128, 98, 97, 5, 5, 5, 5, - /* 520 */ 5, 5, 5, 5, 128, 5, 5, 5, 5, 5, - /* 530 */ 102, 76, 59, 58, 0, 269, 269, 269, 269, 269, - /* 540 */ 21, 269, 269, 269, 269, 269, 21, 269, 269, 269, + /* 380 */ 209, 209, 209, 209, 209, 209, 209, 242, 209, 209, + /* 390 */ 209, 209, 59, 108, 209, 261, 209, 209, 209, 209, + /* 400 */ 209, 261, 209, 255, 209, 209, 209, 209, 209, 209, + /* 410 */ 209, 209, 209, 118, 209, 117, 120, 112, 210, 210, + /* 420 */ 210, 111, 116, 110, 109, 122, 75, 84, 83, 49, + /* 430 */ 80, 82, 53, 210, 81, 79, 75, 210, 5, 5, + /* 440 */ 5, 210, 214, 134, 214, 134, 5, 5, 125, 210, + /* 450 */ 87, 105, 211, 98, 210, 98, 222, 224, 218, 221, + /* 460 */ 223, 219, 217, 220, 211, 254, 215, 242, 253, 247, + /* 470 */ 250, 252, 251, 249, 248, 212, 106, 102, 97, 1, + /* 480 */ 97, 113, 98, 97, 102, 113, 98, 102, 97, 97, + /* 490 */ 72, 97, 99, 9, 99, 5, 5, 5, 5, 5, + /* 500 */ 76, 15, 72, 16, 128, 128, 102, 5, 5, 98, + /* 510 */ 97, 5, 5, 5, 5, 5, 5, 5, 5, 5, + /* 520 */ 5, 5, 5, 5, 102, 76, 21, 59, 58, 0, + /* 530 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, + /* 540 */ 21, 269, 269, 269, 269, 269, 269, 269, 269, 269, /* 550 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, /* 560 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, /* 570 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, @@ -335,84 +335,83 @@ static const YYCODETYPE yy_lookahead[] = { /* 710 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, /* 720 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, /* 730 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, - /* 740 */ 269, 269, 269, 269, 269, 269, 269, 269, 269, 269, - /* 750 */ 269, 269, + /* 740 */ 269, 269, 269, 269, 269, 269, }; -#define YY_SHIFT_COUNT (246) +#define YY_SHIFT_COUNT (240) #define YY_SHIFT_MIN (0) -#define YY_SHIFT_MAX (534) +#define YY_SHIFT_MAX (529) static const unsigned short int yy_shift_ofst[] = { /* 0 */ 163, 75, 183, 184, 204, 79, 79, 79, 79, 79, /* 10 */ 79, 0, 22, 204, 256, 256, 256, 46, 79, 79, - /* 20 */ 79, 79, 79, 116, 109, 109, 547, 194, 204, 204, + /* 20 */ 79, 79, 79, 157, 136, 136, 541, 194, 204, 204, /* 30 */ 204, 204, 204, 204, 204, 204, 204, 204, 204, 204, /* 40 */ 204, 204, 204, 204, 204, 256, 256, 249, 249, 249, - /* 50 */ 249, 249, 249, 30, 249, 100, 79, 79, 127, 127, - /* 60 */ 128, 79, 79, 79, 79, 79, 79, 79, 79, 79, + /* 50 */ 249, 249, 249, 30, 249, 111, 79, 79, 127, 127, + /* 60 */ 178, 79, 79, 79, 79, 79, 79, 79, 79, 79, /* 70 */ 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, /* 80 */ 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, - /* 90 */ 79, 79, 79, 79, 79, 79, 79, 79, 266, 328, - /* 100 */ 328, 285, 285, 328, 295, 296, 300, 308, 305, 311, - /* 110 */ 313, 315, 303, 266, 328, 328, 351, 351, 328, 343, - /* 120 */ 345, 380, 350, 349, 381, 352, 357, 328, 362, 328, - /* 130 */ 362, 547, 547, 27, 69, 69, 69, 95, 120, 218, + /* 90 */ 79, 79, 79, 79, 79, 79, 79, 79, 266, 333, + /* 100 */ 333, 285, 285, 333, 295, 296, 298, 305, 306, 310, + /* 110 */ 313, 315, 303, 266, 333, 333, 351, 351, 333, 343, + /* 120 */ 345, 380, 350, 349, 379, 353, 356, 333, 361, 333, + /* 130 */ 361, 541, 541, 27, 69, 69, 69, 95, 120, 218, /* 140 */ 218, 218, 232, 186, 186, 186, 186, 257, 264, 26, - /* 150 */ 154, 61, 61, 212, 161, 151, 178, 206, 207, 288, - /* 160 */ 306, 283, 244, 213, 145, 216, 235, 237, 215, 217, - /* 170 */ 331, 342, 278, 433, 307, 438, 312, 386, 441, 314, - /* 180 */ 443, 316, 392, 463, 344, 417, 389, 353, 374, 382, - /* 190 */ 375, 383, 384, 387, 482, 390, 388, 391, 393, 376, - /* 200 */ 394, 377, 395, 397, 400, 399, 402, 401, 419, 483, - /* 210 */ 496, 497, 498, 499, 500, 430, 492, 436, 493, 385, - /* 220 */ 396, 408, 506, 507, 416, 418, 408, 511, 512, 513, - /* 230 */ 514, 515, 516, 517, 518, 520, 521, 522, 523, 524, - /* 240 */ 428, 455, 519, 525, 473, 475, 534, + /* 150 */ 65, 61, 61, 212, 161, 151, 206, 207, 213, 279, + /* 160 */ 288, 302, 229, 214, 85, 216, 235, 236, 192, 217, + /* 170 */ 335, 336, 274, 433, 309, 434, 435, 311, 441, 442, + /* 180 */ 363, 323, 346, 355, 370, 375, 357, 381, 478, 383, + /* 190 */ 384, 386, 382, 368, 385, 372, 388, 391, 392, 393, + /* 200 */ 394, 395, 418, 484, 490, 491, 492, 493, 494, 424, + /* 210 */ 486, 430, 487, 376, 377, 404, 502, 503, 411, 413, + /* 220 */ 404, 506, 507, 508, 509, 510, 511, 512, 513, 514, + /* 230 */ 515, 516, 517, 518, 422, 449, 505, 519, 468, 470, + /* 240 */ 529, }; #define YY_REDUCE_COUNT (132) #define YY_REDUCE_MIN (-255) -#define YY_REDUCE_MAX (262) +#define YY_REDUCE_MAX (263) static const short yy_reduce_ofst[] = { /* 0 */ -200, -53, -225, -238, -222, -151, -122, -194, -117, -92, - /* 10 */ 7, -197, -190, -236, -163, -36, -34, -138, -150, -159, - /* 20 */ -125, -9, -152, -78, -1, 23, -43, -255, -253, -223, - /* 30 */ -144, -85, -60, 4, 12, 52, 58, 68, 82, 88, - /* 40 */ 90, 94, 96, 97, 98, -22, 41, 80, 114, 118, + /* 10 */ 7, -197, -190, -236, -163, -34, -32, -138, -150, -159, + /* 20 */ -125, -36, -152, -78, 23, 63, -57, -255, -253, -223, + /* 30 */ -144, -28, -11, 4, 6, 12, 52, 55, 78, 90, + /* 40 */ 91, 92, 93, 94, 96, -22, 109, 116, 117, 118, /* 50 */ 119, 121, 122, 123, 125, 124, 153, 156, 102, 103, /* 60 */ 126, 160, 162, 164, 165, 166, 167, 168, 169, 170, /* 70 */ 171, 172, 173, 174, 175, 176, 177, 179, 180, 181, - /* 80 */ 182, 185, 187, 188, 189, 190, 191, 192, 193, 195, - /* 90 */ 196, 197, 198, 199, 200, 201, 202, 203, 150, 205, - /* 100 */ 208, 134, 142, 209, 159, 210, 214, 211, 219, 221, - /* 110 */ 220, 224, 226, 223, 222, 225, 227, 228, 229, 230, - /* 120 */ 233, 231, 234, 236, 239, 240, 238, 241, 248, 251, - /* 130 */ 255, 247, 262, + /* 80 */ 182, 185, 187, 188, 189, 190, 191, 193, 195, 196, + /* 90 */ 197, 198, 199, 200, 201, 202, 203, 205, 145, 208, + /* 100 */ 209, 134, 140, 210, 148, 211, 215, 219, 221, 220, + /* 110 */ 224, 226, 222, 225, 223, 227, 228, 230, 231, 233, + /* 120 */ 237, 234, 240, 238, 242, 243, 245, 239, 241, 244, + /* 130 */ 253, 251, 263, }; static const YYACTIONTYPE yy_default[] = { - /* 0 */ 623, 675, 664, 835, 835, 623, 623, 623, 623, 623, - /* 10 */ 623, 765, 641, 835, 623, 623, 623, 623, 623, 623, - /* 20 */ 623, 623, 623, 677, 677, 677, 760, 623, 623, 623, - /* 30 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 40 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 50 */ 623, 623, 623, 623, 623, 623, 623, 623, 784, 784, - /* 60 */ 758, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 70 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 662, - /* 80 */ 623, 660, 623, 623, 623, 623, 623, 623, 623, 623, - /* 90 */ 623, 623, 623, 623, 623, 649, 623, 623, 623, 643, - /* 100 */ 643, 623, 623, 643, 791, 795, 789, 777, 785, 776, - /* 110 */ 772, 771, 799, 623, 643, 643, 672, 672, 643, 693, - /* 120 */ 691, 689, 681, 687, 683, 685, 679, 643, 670, 643, - /* 130 */ 670, 708, 721, 623, 800, 834, 790, 818, 817, 830, - /* 140 */ 824, 823, 623, 822, 821, 820, 819, 623, 623, 623, - /* 150 */ 623, 826, 825, 623, 623, 623, 623, 623, 623, 623, - /* 160 */ 623, 623, 802, 796, 792, 623, 623, 623, 623, 623, - /* 170 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 180 */ 623, 623, 623, 623, 623, 623, 623, 623, 757, 623, - /* 190 */ 623, 766, 623, 623, 623, 623, 623, 623, 786, 623, - /* 200 */ 778, 623, 623, 623, 623, 623, 623, 734, 623, 623, - /* 210 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 220 */ 623, 839, 623, 623, 623, 728, 837, 623, 623, 623, - /* 230 */ 623, 623, 623, 623, 623, 623, 623, 623, 623, 623, - /* 240 */ 696, 623, 647, 645, 623, 639, 623, + /* 0 */ 617, 669, 658, 829, 829, 617, 617, 617, 617, 617, + /* 10 */ 617, 759, 635, 829, 617, 617, 617, 617, 617, 617, + /* 20 */ 617, 617, 617, 671, 671, 671, 754, 617, 617, 617, + /* 30 */ 617, 617, 617, 617, 617, 617, 617, 617, 617, 617, + /* 40 */ 617, 617, 617, 617, 617, 617, 617, 617, 617, 617, + /* 50 */ 617, 617, 617, 617, 617, 617, 617, 617, 778, 778, + /* 60 */ 752, 617, 617, 617, 617, 617, 617, 617, 617, 617, + /* 70 */ 617, 617, 617, 617, 617, 617, 617, 617, 617, 656, + /* 80 */ 617, 654, 617, 617, 617, 617, 617, 617, 617, 617, + /* 90 */ 617, 617, 617, 617, 617, 643, 617, 617, 617, 637, + /* 100 */ 637, 617, 617, 637, 785, 789, 783, 771, 779, 770, + /* 110 */ 766, 765, 793, 617, 637, 637, 666, 666, 637, 687, + /* 120 */ 685, 683, 675, 681, 677, 679, 673, 637, 664, 637, + /* 130 */ 664, 702, 715, 617, 794, 828, 784, 812, 811, 824, + /* 140 */ 818, 817, 617, 816, 815, 814, 813, 617, 617, 617, + /* 150 */ 617, 820, 819, 617, 617, 617, 617, 617, 617, 617, + /* 160 */ 617, 617, 796, 790, 786, 617, 617, 617, 617, 617, + /* 170 */ 617, 617, 617, 617, 617, 617, 617, 617, 617, 617, + /* 180 */ 617, 617, 751, 617, 617, 760, 617, 617, 617, 617, + /* 190 */ 617, 617, 780, 617, 772, 617, 617, 617, 617, 617, + /* 200 */ 617, 728, 617, 617, 617, 617, 617, 617, 617, 617, + /* 210 */ 617, 617, 617, 617, 617, 833, 617, 617, 617, 722, + /* 220 */ 831, 617, 617, 617, 617, 617, 617, 617, 617, 617, + /* 230 */ 617, 617, 617, 617, 690, 617, 641, 639, 617, 633, + /* 240 */ 617, }; /********** End of lemon-generated parsing tables *****************************/ @@ -565,8 +564,8 @@ static const YYCODETYPE yyFallback[] = { 0, /* SET => nothing */ 0, /* KILL => nothing */ 0, /* CONNECTION => nothing */ - 0, /* COLON => nothing */ 0, /* STREAM => nothing */ + 0, /* COLON => nothing */ 1, /* ABORT => ID */ 1, /* AFTER => ID */ 1, /* ATTACH => ID */ @@ -856,8 +855,8 @@ static const char *const yyTokenName[] = { /* 130 */ "SET", /* 131 */ "KILL", /* 132 */ "CONNECTION", - /* 133 */ "COLON", - /* 134 */ "STREAM", + /* 133 */ "STREAM", + /* 134 */ "COLON", /* 135 */ "ABORT", /* 136 */ "AFTER", /* 137 */ "ATTACH", @@ -1216,9 +1215,9 @@ static const char *const yyRuleName[] = { /* 214 */ "cmd ::= ALTER TABLE ids cpxName DROP TAG ids", /* 215 */ "cmd ::= ALTER TABLE ids cpxName CHANGE TAG ids ids", /* 216 */ "cmd ::= ALTER TABLE ids cpxName SET TAG ids EQ tagitem", - /* 217 */ "cmd ::= KILL CONNECTION IPTOKEN COLON INTEGER", - /* 218 */ "cmd ::= KILL STREAM IPTOKEN COLON INTEGER COLON INTEGER", - /* 219 */ "cmd ::= KILL QUERY IPTOKEN COLON INTEGER COLON INTEGER", + /* 217 */ "cmd ::= KILL CONNECTION INTEGER", + /* 218 */ "cmd ::= KILL STREAM INTEGER COLON INTEGER", + /* 219 */ "cmd ::= KILL QUERY INTEGER COLON INTEGER", }; #endif /* NDEBUG */ @@ -1893,9 +1892,9 @@ static const struct { { 207, -7 }, /* (214) cmd ::= ALTER TABLE ids cpxName DROP TAG ids */ { 207, -8 }, /* (215) cmd ::= ALTER TABLE ids cpxName CHANGE TAG ids ids */ { 207, -9 }, /* (216) cmd ::= ALTER TABLE ids cpxName SET TAG ids EQ tagitem */ - { 207, -5 }, /* (217) cmd ::= KILL CONNECTION IPTOKEN COLON INTEGER */ - { 207, -7 }, /* (218) cmd ::= KILL STREAM IPTOKEN COLON INTEGER COLON INTEGER */ - { 207, -7 }, /* (219) cmd ::= KILL QUERY IPTOKEN COLON INTEGER COLON INTEGER */ + { 207, -3 }, /* (217) cmd ::= KILL CONNECTION INTEGER */ + { 207, -5 }, /* (218) cmd ::= KILL STREAM INTEGER COLON INTEGER */ + { 207, -5 }, /* (219) cmd ::= KILL QUERY INTEGER COLON INTEGER */ }; static void yy_accept(yyParser*); /* Forward Declaration */ @@ -2734,14 +2733,14 @@ static void yy_reduce( setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); } break; - case 217: /* cmd ::= KILL CONNECTION IPTOKEN COLON INTEGER */ -{yymsp[-2].minor.yy0.n += (yymsp[-1].minor.yy0.n + yymsp[0].minor.yy0.n); setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &yymsp[-2].minor.yy0);} + case 217: /* cmd ::= KILL CONNECTION INTEGER */ +{setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &yymsp[0].minor.yy0);} break; - case 218: /* cmd ::= KILL STREAM IPTOKEN COLON INTEGER COLON INTEGER */ -{yymsp[-4].minor.yy0.n += (yymsp[-3].minor.yy0.n + yymsp[-2].minor.yy0.n + yymsp[-1].minor.yy0.n + yymsp[0].minor.yy0.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &yymsp[-4].minor.yy0);} + case 218: /* cmd ::= KILL STREAM INTEGER COLON INTEGER */ +{yymsp[-2].minor.yy0.n += (yymsp[-1].minor.yy0.n + yymsp[0].minor.yy0.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &yymsp[-2].minor.yy0);} break; - case 219: /* cmd ::= KILL QUERY IPTOKEN COLON INTEGER COLON INTEGER */ -{yymsp[-4].minor.yy0.n += (yymsp[-3].minor.yy0.n + yymsp[-2].minor.yy0.n + yymsp[-1].minor.yy0.n + yymsp[0].minor.yy0.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &yymsp[-4].minor.yy0);} + case 219: /* cmd ::= KILL QUERY INTEGER COLON INTEGER */ +{yymsp[-2].minor.yy0.n += (yymsp[-1].minor.yy0.n + yymsp[0].minor.yy0.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &yymsp[-2].minor.yy0);} break; default: break; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index fd317e9d0cebf016652952d9eb370d5fbad95143..eab70b591349cacfc382c206e393a4846283a559 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -977,7 +977,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // tdResetDataCols(pHelper->pDataCols[1]); while (true) { if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break; - tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); + tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows, + pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5); ASSERT(pHelper->pDataCols[1]->numOfRows > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) @@ -989,54 +990,6 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa } round++; blkIdx++; - // TODO: the blkIdx here is not correct - - // if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) { - // if (pHelper->pDataCols[1]->numOfRows > 0) { - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], - // pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) - // goto _err; - // // TODO: the blkIdx here is not correct - // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows); - // } - // } - - // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows - // ? INT64_MAX - // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; - // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; - - // if (key1 < key2) { - // for (int i = 0; i < pDataCols->numOfCols; i++) { - // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), - // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), - // TYPE_BYTES[pDataCol->type]); - // } - // pHelper->pDataCols[1]->numOfRows++; - // iter1++; - // } else if (key1 == key2) { - // // TODO: think about duplicate key cases - // ASSERT(false); - // } else { - // for (int i = 0; i < pDataCols->numOfCols; i++) { - // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), - // ((char *)pDataCols->cols[i].pData + - // TYPE_BYTES[pDataCol->type] * iter2), - // TYPE_BYTES[pDataCol->type]); - // } - // pHelper->pDataCols[1]->numOfRows++; - // iter2++; - // } - - // if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err; - // // TODO: blkIdx here is not correct, fix it - // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); - - // tdResetDataCols(pHelper->pDataCols[1]); - // } } } } diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 6a22bdea6a4abd5423de0495894633661eec7057..2369f63f16fefa3bfd49d0b7837eab011e17c798 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz */ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); +/** + * update the expire time of data in cache + * @param pCacheObj cache object + * @param key key + * @param expireTime new expire time of data + * @return + */ +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime); + /** * Add one reference count for the exist data, and assign this data for a new owner. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 82873f9906affc26fc995a4dc326521911e7f008..b0291b5cc0e9e3bf99ec2825582e7786629f2c6f 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -488,6 +488,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { return (ptNode != NULL) ? (*ptNode)->data : NULL; } +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) { + if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { + return NULL; + } + + uint32_t keyLen = (uint32_t)strlen(key); + + __cache_rd_lock(pCacheObj); + + SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); + if (ptNode != NULL) { + T_REF_INC(*ptNode); + (*ptNode)->expiredTime = expireTime; + } + + __cache_unlock(pCacheObj); + + if (ptNode != NULL) { + atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); + uTrace("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + } else { + atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); + uTrace("key:%s not in cache, retrieved failed", key); + } + + atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); + return (ptNode != NULL) ? (*ptNode)->data : NULL; +} + void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if (pCacheObj == NULL || data == NULL) return NULL; diff --git a/tests/pytest/query/filter.py b/tests/pytest/query/filter.py index 43c8e5bf0aa612079b26fdf9edc9d303023a1405..f107985f150a05cdd38a7fa1453d59d35c8a903a 100644 --- a/tests/pytest/query/filter.py +++ b/tests/pytest/query/filter.py @@ -87,7 +87,7 @@ class TDTestCase: # <> for timestamp type tdSql.query("select * from db.st where ts <> '2020-05-13 10:00:00.002'") - #tdSql.checkRows(4) + # tdSql.checkRows(4) # <> for numeric type tdSql.query("select * from db.st where tagtype <> 2") @@ -96,7 +96,7 @@ class TDTestCase: # <> for nchar type tdSql.query("select * from db.st where name <> 'first'") tdSql.checkRows(4) - + # % for nchar type tdSql.query("select * from db.st where name like 'fi%'") tdSql.checkRows(2) diff --git a/tests/pytest/query/filterCombo.py b/tests/pytest/query/filterCombo.py index c9c7ade73eece9a33ae99177dc680463e22d2701..f72b913c92fa1d967aac1e15e050bda23fc7342d 100644 --- a/tests/pytest/query/filterCombo.py +++ b/tests/pytest/query/filterCombo.py @@ -42,14 +42,17 @@ class TDTestCase: ('2020-05-13 10:00:00.005', 3, 'third')""") # query with filter condition A OR condition B - tdSql.query("select * from db.st where ts > '2020-05-13 10:00:00.002' AND tagtype < 2") + tdSql.query( + "select * from db.st where ts > '2020-05-13 10:00:00.002' AND tagtype < 2") tdSql.checkRows(1) # query with filter condition A OR condition B, error expected - tdSql.error("select * from db.st where ts > '2020-05-13 10:00:00.002' OR tagtype < 2") + tdSql.error( + "select * from db.st where ts > '2020-05-13 10:00:00.002' OR tagtype < 2") # illegal condition - tdSql.error("select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") + tdSql.error( + "select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") tdSql.error("select * from db.st where tagtype <> 1 OR tagtype < 2") def stop(self): diff --git a/tests/pytest/query/queryError.py b/tests/pytest/query/queryError.py index 95924f48cc062b3756ab5e80fb72b45f707f8939..f1fd9c0dec480d8d4e0b49635fd23b275eeeaa24 100644 --- a/tests/pytest/query/queryError.py +++ b/tests/pytest/query/queryError.py @@ -41,18 +41,17 @@ class TDTestCase: ('2020-05-13 10:00:00.002', 3, 'third') dev_002 VALUES('2020-05-13 10:00:00.003', 1, 'first'), ('2020-05-13 10:00:00.004', 2, 'second'), ('2020-05-13 10:00:00.005', 3, 'third')""") - # query first .. as .. - tdSql.error("select first(*) as one from st") + tdSql.error("select first(*) as one from st") # query last .. as .. - tdSql.error("select last(*) as latest from st") + tdSql.error("select last(*) as latest from st") # query last row .. as .. - tdSql.error("select last_row as latest from st") + tdSql.error("select last_row as latest from st") # query distinct on normal colnum - tdSql.error("select distinct tagtype from st") + tdSql.error("select distinct tagtype from st") # query .. order by non-time field tdSql.error("select * from st order by name") diff --git a/tests/pytest/query/queryNormal.py b/tests/pytest/query/queryNormal.py index 442661f72a4510e9d311ee38bbc16a204bf71670..814c627d89790dd730d1e86d9101a612095678be 100644 --- a/tests/pytest/query/queryNormal.py +++ b/tests/pytest/query/queryNormal.py @@ -28,17 +28,20 @@ class TDTestCase: print("==============step1") - tdSql.execute("create table stb1 (ts timestamp, c1 int, c2 float) tags(t1 int, t2 binary(10), t3 nchar(10))") - tdSql.execute("insert into tb1 using stb1 tags(1,'tb1', '表1') values ('2020-04-18 15:00:00.000', 1, 0.1), ('2020-04-18 15:00:01.000', 2, 0.1)") - tdSql.execute("insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") - - # inner join --- bug + tdSql.execute( + "create table stb1 (ts timestamp, c1 int, c2 float) tags(t1 int, t2 binary(10), t3 nchar(10))") + tdSql.execute( + "insert into tb1 using stb1 tags(1,'tb1', '表1') values ('2020-04-18 15:00:00.000', 1, 0.1), ('2020-04-18 15:00:01.000', 2, 0.1)") + tdSql.execute( + "insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") + + # inner join --- bug tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") tdSql.checkRows(1) # join 3 tables -- bug exists tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id") - + # query show stable tdSql.query("show stables") tdSql.checkRows(1) @@ -51,15 +54,15 @@ class TDTestCase: tdSql.query("select count(*) from stb1") tdSql.checkData(0, 0, 4) - # query first + # query first tdSql.query("select first(*) from stb1") tdSql.checkData(0, 1, 1) - - # query last + + # query last tdSql.query("select last(*) from stb1") tdSql.checkData(0, 1, 4) - # query last_row + # query last_row tdSql.query("select last_row(*) from stb1") tdSql.checkData(0, 1, 4) @@ -70,7 +73,7 @@ class TDTestCase: # query first ... as tdSql.query("select first(*) as begin from stb1") tdSql.checkData(0, 1, 1) - + # query last ... as tdSql.query("select last(*) as end from stb1") tdSql.checkData(0, 1, 4) @@ -93,7 +96,7 @@ class TDTestCase: # query ... alias for table ---- bug tdSql.query("select t.ts from tb1 t") - tdSql.checkRows(2) + tdSql.checkRows(2) # query ... tbname tdSql.query("select tbname from stb1") diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py index c7b39ca1a13eb5b288f29c333307d1b5f12e671b..4d1ef3b11d90cb8cbb72094b8f6a7d9ae091bc59 100644 --- a/tests/pytest/random-test/random-test-multi-threading-3.py +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -111,7 +111,6 @@ class Test (threading.Thread): last_tb) written = written + 1 - def drop_stable(self): tdLog.info("drop_stable") global last_stb @@ -152,7 +151,6 @@ class Test (threading.Thread): last_tb = "" written = 0 - def query_data_from_stable(self): tdLog.info("query_data_from_stable") global last_stb @@ -164,7 +162,6 @@ class Test (threading.Thread): tdLog.info("will query data from super table") tdSql.execute('select * from %s' % last_stb) - def reset_query_cache(self): tdLog.info("reset_query_cache") global last_tb @@ -232,7 +229,7 @@ class Test (threading.Thread): self.threadLock.acquire() tdLog.notice("first thread") randDataOp = random.randint(1, 3) - dataOp.get(randDataOp , lambda: "ERROR")() + dataOp.get(randDataOp, lambda: "ERROR")() self.threadLock.release() elif (self.threadId == 2): diff --git a/tests/pytest/random-test/random-test-multi-threading.py b/tests/pytest/random-test/random-test-multi-threading.py index d887b714ba193a23daf9c3a8ca41edb780dfd7e5..1c06f3a1ddd5bb527bd79fb8e15d003e576a466e 100644 --- a/tests/pytest/random-test/random-test-multi-threading.py +++ b/tests/pytest/random-test/random-test-multi-threading.py @@ -111,7 +111,6 @@ class Test (threading.Thread): last_tb) written = written + 1 - def drop_stable(self): tdLog.info("drop_stable") global last_stb @@ -154,7 +153,6 @@ class Test (threading.Thread): last_tb = "" written = 0 - def query_data_from_stable(self): tdLog.info("query_data_from_stable") global last_stb @@ -166,7 +164,6 @@ class Test (threading.Thread): tdLog.info("will query data from super table") tdSql.execute('select * from %s' % last_stb) - def reset_query_cache(self): tdLog.info("reset_query_cache") global last_tb @@ -230,7 +227,7 @@ class Test (threading.Thread): self.threadLock.acquire() tdLog.notice("first thread") randDataOp = random.randint(1, 3) - dataOp.get(randDataOp , lambda: "ERROR")() + dataOp.get(randDataOp, lambda: "ERROR")() self.threadLock.release() elif (self.threadId == 2): diff --git a/tests/pytest/random-test/random-test.py b/tests/pytest/random-test/random-test.py index a8ff4d81d121c7fad298357b74dd1658894becbf..855cabdeddf5c437a658f145ce2fa66415b001a3 100644 --- a/tests/pytest/random-test/random-test.py +++ b/tests/pytest/random-test/random-test.py @@ -112,7 +112,6 @@ class Test: tdSql.execute('drop table %s' % self.last_stb) self.last_stb = "" - def query_data_from_stable(self): tdLog.info("query_data_from_stable") if (self.last_stb == ""): @@ -122,20 +121,21 @@ class Test: tdLog.info("will query data from super table") tdSql.execute('select * from %s' % self.last_stb) - def restart_database(self): tdLog.info("restart_databae") tdDnodes.stop(1) tdDnodes.start(1) tdLog.sleep(5) - def force_restart_database(self): tdLog.info("force_restart_database") tdDnodes.forcestop(1) tdDnodes.start(1) tdLog.sleep(5) tdSql.prepare() + self.last_tb = "" + self.last_stb = "" + self.written = 0 def drop_table(self): tdLog.info("drop_table") @@ -159,6 +159,9 @@ class Test: tdDnodes.start(1) tdLog.sleep(5) tdSql.prepare() + self.last_tb = "" + self.last_stb = "" + self.written = 0 def delete_datafiles(self): tdLog.info("delete_datafiles") @@ -173,6 +176,9 @@ class Test: tdDnodes.start(1) tdLog.sleep(10) tdSql.prepare() + self.last_tb = "" + self.last_stb = "" + self.written = 0 class TDTestCase: diff --git a/tests/script/unique/arbitrator/dn3_mn1_replica_change.sim b/tests/script/unique/arbitrator/dn3_mn1_replica_change.sim index f34322a25561ab54e942c000157a9d90664b8197..8b75bba29b8817223c6ef4ba959f8a2359a3e8c5 100644 --- a/tests/script/unique/arbitrator/dn3_mn1_replica_change.sim +++ b/tests/script/unique/arbitrator/dn3_mn1_replica_change.sim @@ -4,9 +4,9 @@ system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 -system sh/cfg.sh -n dnode1 -c numOfMPeers -v 1 -system sh/cfg.sh -n dnode2 -c numOfMPeers -v 1 -system sh/cfg.sh -n dnode3 -c numOfMPeers -v 1 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 system sh/cfg.sh -n dnode1 -c walLevel -v 2 system sh/cfg.sh -n dnode2 -c walLevel -v 2 @@ -28,11 +28,6 @@ system sh/cfg.sh -n dnode2 -c alternativeRole -v 2 system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 system sh/cfg.sh -n dnode4 -c alternativeRole -v 2 -system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 - system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 @@ -94,6 +89,72 @@ if $data00 != $totalRows then return -1 endi + + + +print ============== step2-1: start dnode2 for falling disc, then restart dnode2, and check rows +system sh/exec.sh -n dnode2 -s stop +sleep $sleepTimer +wait_dnode2_offline_0: +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode2_offline_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5 +#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6 +#$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +#$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 +#$dnode5Status = $data4_5 + +if $dnode2Status != offline then + sleep 2000 + goto wait_dnode2_offline_0 +endi + +system sh/exec.sh -n dnode2 -s start +sleep $sleepTimer +wait_dnode2_reready: +sql show dnodes +if $rows != 2 then + sleep 2000 + goto wait_dnode2_reready +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +#print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5 +#print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6 +#$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +#$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 +#$dnode5Status = $data4_5 + +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode2_reready +endi + +sql select count(*) from $stb +sleep 1000 +print data00 $data00 +if $data00 != $totalRows then + return -1 +endi + + + + + + print ============== step3: start dnode3 and add into cluster , then alter replica from 1 to 2, and waiting sync system sh/exec.sh -n dnode3 -s start sql create dnode $hostname3