diff --git a/include/common/systable.h b/include/common/systable.h index 8b0bb4a3fba107e1d74bee6885c39ae06d425a19..d2c28941c77ae92c7a52c6e9df89de1650ad4360 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -52,6 +52,7 @@ extern "C" { #define TSDB_PERFS_TABLE_OFFSETS "offsets" #define TSDB_PERFS_TABLE_TRANS "trans" #define TSDB_PERFS_TABLE_STREAMS "streams" +#define TSDB_PERFS_TABLE_APPS "apps" typedef struct SSysDbTableSchema { const char* name; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 80e8d291970cac19eb74b51a507d937848305d61..81123ea3d977112cd1fb7c12bda77e8dbd2ddd2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1982,16 +1982,17 @@ typedef struct { typedef struct { int64_t tid; - int32_t status; + char status[TSDB_JOB_STATUS_LEN]; } SQuerySubDesc; typedef struct { char sql[TSDB_SHOW_SQL_LEN]; uint64_t queryId; int64_t useconds; - int64_t stime; + int64_t stime; // timestamp precision ms int64_t reqRid; int32_t pid; + bool stableQuery; char fqdn[TSDB_FQDN_LEN]; int32_t subPlanNum; SArray* subDesc; // SArray diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 33cc4a9c865241d39ca2b94317f93ffff4694581..42c7c2795131eff4c5df174ee8f88676bd6c0d52 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -352,6 +352,7 @@ typedef struct SQuery { SArray* pPlaceholderValues; SNode* pPrepareRoot; struct SParseMetaCache* pMetaCache; + bool stableQuery; } SQuery; void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); diff --git a/include/util/tdef.h b/include/util/tdef.h index 1fa8ad19126f2fe69b4500eac2d9dfdd2665ba17..73b40c71d85827f6634943b2d8d397c066a6961f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -241,6 +241,7 @@ typedef enum ELogicConditionType { #define TSDB_USET_PASSWORD_LEN 129 #define TSDB_VERSION_LEN 12 #define TSDB_LABEL_LEN 8 +#define TSDB_JOB_STATUS_LEN 32 #define TSDB_CLUSTER_ID_LEN 40 #define TSDB_FQDN_LEN 128 diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d72aa39e4c76d1f6397df2d98f2a02bb8938456b..69a24e20cd7e4a6d20efc9c12f1766f06af050ae 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,7 +45,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms -#define SYNC_ON_TOP_OF_ASYNC 0 +#define SYNC_ON_TOP_OF_ASYNC 1 enum { RES_TYPE__QUERY = 1, @@ -213,6 +213,7 @@ typedef struct SRequestObj { SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; + bool stableQuery; uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog uint32_t retry; @@ -294,7 +295,7 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); -TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, +STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port, int connType); SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen); @@ -305,6 +306,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); +void taos_close_internal(void *taos); + // --- heartbeat // global, called by mgmt int hbMgrInit(); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 936fb92fc4019842485e7051abf161aee8a7d858..c2b5d1de6f9427c1c1aba506ceb9d446a883ce81 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -107,7 +107,7 @@ typedef struct STscStmt { #define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -TAOS_STMT *stmtInit(TAOS *taos); +TAOS_STMT *stmtInit(STscObj* taos); int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); const char *stmtErrstr(TAOS_STMT *stmt); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e1118833816bb405bdc123d15ed53b6b7eb9b29d..3ea40627a185d4a3ae92179f3ec1854c6c1ee6ee 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -174,7 +174,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } if (pRsp->query->killConnection) { - taos_close(pTscObj); + taos_close_internal(pTscObj); } if (pRsp->query->pQnodeList) { @@ -310,11 +310,12 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { } tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql)); - desc.stime = pRequest->metric.start; + desc.stime = pRequest->metric.start / 1000; desc.queryId = pRequest->requestId; desc.useconds = now - pRequest->metric.start; desc.reqRid = pRequest->self; desc.pid = hbBasic->pid; + desc.stableQuery = pRequest->stableQuery; taosGetFqdn(desc.fqdn); desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0; @@ -329,6 +330,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { if (code) { taosArrayDestroy(desc.subDesc); desc.subDesc = NULL; + desc.subPlanNum = 0; } } else { desc.subDesc = NULL; @@ -350,19 +352,24 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_QRY_APP_ERROR; } - int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; - if (numOfQueries <= 0) { - releaseTscObj(connKey->tscRid); - tscDebug("no queries on connection"); - return TSDB_CODE_QRY_APP_ERROR; - } - SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic)); if (NULL == hbBasic) { tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic)); releaseTscObj(connKey->tscRid); return TSDB_CODE_QRY_OUT_OF_MEMORY; } + + hbBasic->connId = pTscObj->connId; + hbBasic->pid = taosGetPId(); + taosGetAppName(hbBasic->app, NULL); + + int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; + if (numOfQueries <= 0) { + req->query = hbBasic; + releaseTscObj(connKey->tscRid); + tscDebug("no queries on connection"); + return TSDB_CODE_SUCCESS; + } hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc)); if (NULL == hbBasic->queryDesc) { @@ -372,9 +379,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - hbBasic->connId = pTscObj->connId; - hbBasic->pid = taosGetPId(); - taosGetAppName(hbBasic->app, NULL); int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); if (code) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 216aa5fbd01f517f4e1f0ec9f38440827e4be43c..949aa22cd1d0d56c2cd4a971385a5e88ac024393 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType); -TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, +STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port, int connType) { if (taos_init() != TSDB_CODE_SUCCESS) { return NULL; @@ -697,6 +697,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } + pRequest->stableQuery = pQuery->stableQuery; + return launchQueryImpl(pRequest, pQuery, false, NULL); } @@ -923,7 +925,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t terrno = pRequest->code; destroyRequest(pRequest); - taos_close(pTscObj); + taos_close_internal(pTscObj); pTscObj = NULL; } else { tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, @@ -958,8 +960,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { taosMemoryFreeClear(db); connectReq.connType = pObj->connType; - connectReq.pid = htonl(appInfo.pid); - connectReq.startTime = htobe64(appInfo.startTime); + connectReq.pid = appInfo.pid; + connectReq.startTime = appInfo.startTime; tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user)); @@ -1088,7 +1090,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons return NULL; } - return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); + STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); + if (pObj) { + return (TAOS*)pObj->id; + } + + return (TAOS*)0; } TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e3fafa0a8216db542ca3bc3d329f68e40af5206a..5f0dbdd41556d3347d963e29e57bf83d30ce8bee 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -93,10 +93,15 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha pass = TSDB_DEFAULT_PASS; } - return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); + STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); + if (pObj) { + return (TAOS*)pObj->id; + } + + return (TAOS*)0; } -void taos_close(TAOS *taos) { +void taos_close_internal(void *taos) { if (taos == NULL) { return; } @@ -107,6 +112,17 @@ void taos_close(TAOS *taos) { taosRemoveRef(clientConnRefPool, pTscObj->id); } +void taos_close(TAOS *taos) { + STscObj* pObj = acquireTscObj((int64_t)taos); + if (NULL == pObj) { + return; + } + + taos_close_internal(pObj); + releaseTscObj((int64_t)taos); +} + + int taos_errno(TAOS_RES *tres) { if (tres == NULL) { return terrno; @@ -186,29 +202,36 @@ static void syncQueryFn(void* param, void* res, int32_t code) { } TAOS_RES *taos_query(TAOS *taos, const char *sql) { - if (taos == NULL || sql == NULL) { + STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (pTscObj == NULL || sql == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - STscObj* pTscObj = (STscObj*)taos; - #if SYNC_ON_TOP_OF_ASYNC SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(¶m->sem, 0, 0); - taos_query_a(pTscObj, sql, syncQueryFn, param); + taos_query_a(taos, sql, syncQueryFn, param); tsem_wait(¶m->sem); + releaseTscObj((int64_t)taos); + return param->pRequest; #else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { + releaseTscObj((int64_t)taos); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; } - return execQuery(pTscObj, sql, sqlLen); + TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen); + + releaseTscObj((int64_t)taos); + + return pRes; #endif } @@ -425,13 +448,15 @@ int taos_result_precision(TAOS_RES *res) { } int taos_select_db(TAOS *taos, const char *db) { - STscObj *pObj = (STscObj *)taos; + STscObj* pObj = acquireTscObj((int64_t)taos); if (pObj == NULL) { + releaseTscObj((int64_t)taos); terrno = TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED; } if (db == NULL || strlen(db) == 0) { + releaseTscObj((int64_t)taos); terrno = TSDB_CODE_TSC_INVALID_INPUT; return terrno; } @@ -443,6 +468,7 @@ int taos_select_db(TAOS *taos, const char *db) { int32_t code = taos_errno(pRequest); taos_free_result(pRequest); + releaseTscObj((int64_t)taos); return code; } @@ -589,19 +615,26 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int taos_validate_sql(TAOS *taos, const char *sql) { return true; } void taos_reset_current_db(TAOS *taos) { - if (taos == NULL) { + STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (pTscObj == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; return; } - resetConnectDB(taos); + resetConnectDB(pTscObj); + + releaseTscObj((int64_t)taos); } const char *taos_get_server_info(TAOS *taos) { - if (taos == NULL) { + STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (pTscObj == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - STscObj *pTscObj = (STscObj *)taos; + releaseTscObj((int64_t)taos); + return pTscObj->ver; } @@ -619,6 +652,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); + pRequest->stableQuery = pQuery->stableQuery; } if (code == TSDB_CODE_SUCCESS) { @@ -652,10 +686,14 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - ASSERT(fp != NULL); - - if (taos == NULL || sql == NULL) { + STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (pTscObj == NULL || sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; + if (pTscObj) { + releaseTscObj((int64_t)taos); + } else { + terrno = TSDB_CODE_TSC_DISCONNECTED; + } fp(param, NULL, terrno); return; } @@ -670,7 +708,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param } SRequestObj *pRequest = NULL; - int32_t code = buildRequest(taos, sql, sqlLen, &pRequest); + int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (code != TSDB_CODE_SUCCESS) { terrno = code; fp(param, NULL, terrno); @@ -866,13 +904,18 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } TAOS_STMT *taos_stmt_init(TAOS *taos) { - if (taos == NULL) { - tscError("NULL parameter for %s", __FUNCTION__); - terrno = TSDB_CODE_INVALID_PARA; + STscObj* pObj = acquireTscObj((int64_t)taos); + if (NULL == pObj) { + tscError("invalid parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - return stmtInit(taos); + TAOS_STMT* pStmt = stmtInit(pObj); + + releaseTscObj((int64_t)taos); + + return pStmt; } int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 8c5645e5b605495c733e656c625a95fab02c27ad..274ae9984d60484c645be19ae2ee05f741aae3ca 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1460,7 +1460,7 @@ static void smlDestroyInfo(SSmlHandle* info){ taosMemoryFreeClear(info); } -static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision){ +static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){ int32_t code = TSDB_CODE_SUCCESS; SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { @@ -1483,7 +1483,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol } ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; - info->taos = (STscObj *)taos; + info->taos = pTscObj; code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); if(code != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" get catalog error %d", info->id, code); @@ -2447,13 +2447,21 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); + STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (NULL == pTscObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + uError("SML:taos_schemaless_insert invalid taos"); + return NULL; + } + + SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!request){ + releaseTscObj((int64_t)taos); uError("SML:taos_schemaless_insert error request is null"); return NULL; } - ((STscObj *)taos)->schemalessType = 1; + pTscObj->schemalessType = 1; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; int cnt = ceil(((double)numLines)/LINE_BATCH); @@ -2468,7 +2476,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(isSchemalessDb(((STscObj *)taos), request) != TSDB_CODE_SUCCESS){ + if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){ request->code = TSDB_CODE_SML_INVALID_DB_CONF; smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); goto end; @@ -2493,13 +2501,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr } for (int i = 0; i < cnt; ++i) { - SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); + SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!req){ request->code = TSDB_CODE_OUT_OF_MEMORY; uError("SML:taos_schemaless_insert error request is null"); goto end; } - SSmlHandle* info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision); + SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision); if(!info){ request->code = TSDB_CODE_OUT_OF_MEMORY; uError("SML:taos_schemaless_insert error SSmlHandle is null"); @@ -2533,8 +2541,9 @@ end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); // ((STscObj *)taos)->schemalessType = 0; - ((STscObj *)taos)->schemalessType = 1; + pTscObj->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); + releaseTscObj((int64_t)taos); return (TAOS_RES*)request; } diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 7dfb6505e9f27eb02d9dcf1eb16d85e004c0769d..1c0caec8102b5b0311c4cfd7240ea78270163628 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -478,7 +478,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -TAOS_STMT* stmtInit(TAOS* taos) { +TAOS_STMT* stmtInit(STscObj* taos) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 0c35f5691e080c6c62840ed0d340df75f3c7b89e..98e16a05a6b2ea1bcbbcaf5259141f8fe5f41e4a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -253,8 +253,8 @@ static const SSysTableMeta infosMeta[] = { static const SSysDbTableSchema connectionsSchema[] = { {.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "program", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, @@ -298,19 +298,18 @@ static const SSysDbTableSchema offsetSchema[] = { }; static const SSysDbTableSchema querySchema[] = { - {.name = "query_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "query_id", .bytes = 26 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, + {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "qid", .bytes = 22 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "time", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "sql_obj_id", .bytes = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, - {.name = "sub_queries", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "sub_query_info", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 31b8eb71bf26184bd351ca6d9b1a704f4bfc36c2..73fd0e7e71aa4291cd93efcd39d177eac6873ccf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -210,6 +210,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI32(pEncoder, desc->pid) < 0) return -1; + if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; @@ -218,7 +219,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; - if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1; + if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1; } } } else { @@ -265,6 +266,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t*)&desc.stableQuery) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; @@ -277,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc sDesc = {0}; if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; - if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1; + if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1; taosArrayPush(desc.subDesc, &sDesc); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 28364737938587012902ca837b2c401d90cbdc83..ba09a182bb355ab12cdcdc3fb35d4a9b5621882b 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -39,6 +39,7 @@ typedef struct { int64_t lastAccessTimeMs; uint64_t killId; int32_t numOfQueries; + SRWLatch queryLock; SArray *pQueries; // SArray } SConnObj; @@ -53,8 +54,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq); static int32_t mndProcessConnectReq(SRpcMsg *pReq); static int32_t mndProcessKillQueryReq(SRpcMsg *pReq); static int32_t mndProcessKillConnReq(SRpcMsg *pReq); -static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows); -static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); int32_t mndInitProfile(SMnode *pMnode) { @@ -74,7 +75,7 @@ int32_t mndInitProfile(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); - // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); @@ -129,7 +130,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType } static void mndFreeConn(SConnObj *pConn) { + taosWLockLatch(&pConn->queryLock); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); + taosWUnLockLatch(&pConn->queryLock); mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); } @@ -222,8 +225,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { goto CONN_OVER; } - mndAcquireConn(pMnode, pConn->id); - SConnectRsp connectRsp = {0}; connectRsp.acctId = pUser->acctId; connectRsp.superUser = pUser->superUser; @@ -259,12 +260,17 @@ CONN_OVER: } static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { + taosWLockLatch(&pConn->queryLock); + taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); pConn->pQueries = pBasic->queryDesc; + pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pBasic->queryDesc = NULL; - pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; + mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); + + taosWUnLockLatch(&pConn->queryLock); return TSDB_CODE_SUCCESS; } @@ -353,14 +359,10 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb return -1; } else { mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); + pConn = mndAcquireConn(pMnode, pBasic->connId); } - } else if (pConn->killed) { - mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); - mndReleaseConn(pMnode, pConn); - terrno = TSDB_CODE_MND_INVALID_CONNECTION; - return -1; } - + SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); if (rspBasic == NULL) { mndReleaseConn(pMnode, pConn); @@ -389,6 +391,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mndReleaseConn(pMnode, pConn); hbRsp.query = rspBasic; + } else { + mDebug("no query info in hb msg"); } int32_t kvNum = taosHashGetSize(pHbReq->info); @@ -559,14 +563,13 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { } } -static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->info.node; - int32_t numOfRows = 0; - SConnObj *pConn = NULL; - int32_t cols = 0; - char *pWrite; - char ipStr[TSDB_IPv4ADDR_LEN + 6]; - +static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SConnObj *pConn = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->cache); @@ -574,48 +577,45 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int3 while (numOfRows < rows) { pConn = mndGetNextConn(pMnode, pShow->pIter); - if (pConn == NULL) break; + if (pConn == NULL) { + pShow->pIter = NULL; + break; + } cols = 0; -#if 0 - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - *(uint32_t *)pWrite = pConn->id; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes); - cols++; - - // app name - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes); - cols++; - - // app pid - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - *(int32_t *)pWrite = pConn->pid; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - taosIpPort2String(pConn->ip, pConn->port, ipStr); - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - *(int64_t *)pWrite = pConn->loginTimeMs; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; - if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs; - *(int64_t *)pWrite = pConn->lastAccessTimeMs; - cols++; -#endif + + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false); + + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConn->user); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)user, false); + + char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(app, pConn->app); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)app, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false); + + char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port); + varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false); numOfRows++; } pShow->numOfRows += numOfRows; - return numOfRows; } @@ -633,9 +633,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p while (numOfRows < rows) { pConn = mndGetNextConn(pMnode, pShow->pIter); - if (pConn == NULL) break; + if (pConn == NULL) { + pShow->pIter = NULL; + break; + } + taosRLockLatch(&pConn->queryLock); if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) { + taosRUnLockLatch(&pConn->queryLock); continue; } @@ -644,14 +649,28 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i); cols = 0; + char queryId[26 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); + varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)queryId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false); + char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(app, pConn->app); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)app, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false); + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(b1, pConn->user); + STR_TO_VARSTR(user, pConn->user); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)user, false); @@ -661,23 +680,41 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false); - char b1[9] = {0}; - STR_TO_VARSTR(b1, online ? "ready" : "offline"); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, b1, false); + colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false); + colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false); - char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, b, false); + colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); + + char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + int32_t strSize = sizeof(subStatus); + int32_t offset = VARSTR_HEADER_SIZE; + for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { + if (i) { + offset += snprintf(subStatus + offset, strSize - offset - 1, ","); + } + SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i); + offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); + } + varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, subStatus, false); + + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(sql, pQuery->sql); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)sql, false); numOfRows++; - sdbRelease(pSdb, pDnode); } + + taosRUnLockLatch(&pConn->queryLock); } pShow->numOfRows += numOfRows; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 02c485fa837bf4b487c4fdbd099171ead448c7f0..7fffa84e0b4ef85b846259a5173d9d2a263b143c 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -468,8 +468,8 @@ int32_t udfdConnectToMnode() { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); - connReq.pid = htonl(taosGetPId()); - connReq.startTime = htobe64(taosGetTimestampMs()); + connReq.pid = taosGetPId(); + connReq.startTime = taosGetTimestampMs(); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); void * pReq = rpcMallocCont(contLen); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 1ea9124839195904073312df2fd8b0161f2bfed0..6db824e44586e186efa0f153045081e53b771ce1 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -199,6 +199,7 @@ SNodeptr nodesMakeNode(ENodeType type) { case QUERY_NODE_SHOW_SUBSCRIBES_STMT: case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT: + case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_VNODES_STMT: case QUERY_NODE_SHOW_APPS_STMT: diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index aca7d9c9d332bc6318a39d457a406180ed1ee609..af22d8f62923534af95973b4425cbbdde726194a 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -383,6 +383,16 @@ static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt* pCxt->pMetaCache); } +static int32_t collectMetaKeyFromShowConnections(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONNECTIONS, + pCxt->pMetaCache); +} + +static int32_t collectMetaKeyFromShowQueries(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_QUERIES, + pCxt->pMetaCache); +} + static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS, pCxt->pMetaCache); @@ -453,6 +463,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_TOPICS_STMT: return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_CONNECTIONS_STMT: + return collectMetaKeyFromShowConnections(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_QUERIES_STMT: + return collectMetaKeyFromShowQueries(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_TRANSACTIONS_STMT: return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_DELETE_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d623b1a9a9c8ccf3cd42ab7c27abcd8563ddad7b..9e34fe6af7f9efbe8ead5d0595281e7379477327 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -42,6 +42,7 @@ typedef struct STranslateContext { SExplainOptions* pExplainOpt; SParseMetaCache* pMetaCache; bool createStream; + bool stableQuery; } STranslateContext; typedef struct SFullDatabaseName { @@ -1478,6 +1479,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { if (TSDB_CODE_SUCCESS == code) { code = addNamespace(pCxt, pRealTable); } + if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { + pCxt->stableQuery = true; + } break; } case QUERY_NODE_TEMP_TABLE: { @@ -4054,12 +4058,14 @@ static const char* getSysDbName(ENodeType type) { case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT: + case QUERY_NODE_SHOW_VARIABLE_STMT: return TSDB_INFORMATION_SCHEMA_DB; case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_STREAMS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT: + case QUERY_NODE_SHOW_APPS_STMT: return TSDB_PERFORMANCE_SCHEMA_DB; default: break; @@ -4109,6 +4115,10 @@ static const char* getSysTableName(ENodeType type) { return TSDB_PERFS_TABLE_TOPICS; case QUERY_NODE_SHOW_TRANSACTIONS_STMT: return TSDB_PERFS_TABLE_TRANS; + case QUERY_NODE_SHOW_VARIABLE_STMT: + return TSDB_INS_TABLE_CONFIGS; + case QUERY_NODE_SHOW_APPS_STMT: + return TSDB_PERFS_TABLE_APPS; default: break; } @@ -4714,6 +4724,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) { code = TSDB_CODE_SUCCESS; + goto over; } *pIsSuperTable = false; @@ -4807,7 +4818,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { } } - if (isSuperTable) { + if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) { taosHashCleanup(pVgroupHashmap); return TSDB_CODE_SUCCESS; } @@ -5152,6 +5163,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT: + case QUERY_NODE_SHOW_VARIABLE_STMT: + case QUERY_NODE_SHOW_APPS_STMT: code = rewriteShow(pCxt, pQuery); break; case QUERY_NODE_CREATE_TABLE_STMT: @@ -5248,6 +5261,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { break; } + pQuery->stableQuery = pCxt->stableQuery; + if (pQuery->haveResultSet) { if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 18068870d772f96f89e274e093a33f0801fa9c05..0eaeeae9cb75edbe5c3b974af4acb61eee6e713d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -151,7 +151,9 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); - SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; + SQuerySubDesc subDesc = {0}; + subDesc.tid = pTask->taskId; + strcpy(subDesc.status, jobTaskStatusStr(pTask->status)); taosArrayPush(pSub, &subDesc); } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 10a54755559f60f80b0b3f5b64ddf1060de71c6c..5f9c2485d17a89a8a1046a0b48b1fd6276f5035c 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -931,15 +931,15 @@ bool taosCacheIterNext(SCacheIter *pIter) { SCacheObj *pCacheObj = pIter->pCacheObj; if (pIter->index + 1 >= pIter->numOfObj) { - if (pIter->entryIndex + 1 >= pCacheObj->capacity) { - return false; - } - // release the reference for all objects in the snapshot for (int32_t i = 0; i < pIter->numOfObj; ++i) { char *p = pIter->pCurrent[i]->data; taosCacheRelease(pCacheObj, (void **)&p, false); pIter->pCurrent[i] = NULL; + } + + if (pIter->entryIndex + 1 >= pCacheObj->capacity) { + return false; } while (1) {