提交 947aecb7 编写于 作者: D dapan1121

show queries/connection; kill connection

上级 f15d17c5
...@@ -52,6 +52,7 @@ extern "C" { ...@@ -52,6 +52,7 @@ extern "C" {
#define TSDB_PERFS_TABLE_OFFSETS "offsets" #define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans" #define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams" #define TSDB_PERFS_TABLE_STREAMS "streams"
#define TSDB_PERFS_TABLE_APPS "apps"
typedef struct SSysDbTableSchema { typedef struct SSysDbTableSchema {
const char* name; const char* name;
......
...@@ -1982,16 +1982,17 @@ typedef struct { ...@@ -1982,16 +1982,17 @@ typedef struct {
typedef struct { typedef struct {
int64_t tid; int64_t tid;
int32_t status; char status[TSDB_JOB_STATUS_LEN];
} SQuerySubDesc; } SQuerySubDesc;
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId; uint64_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; int64_t stime; // timestamp precision ms
int64_t reqRid; int64_t reqRid;
int32_t pid; int32_t pid;
bool stableQuery;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum; int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc> SArray* subDesc; // SArray<SQuerySubDesc>
......
...@@ -352,6 +352,7 @@ typedef struct SQuery { ...@@ -352,6 +352,7 @@ typedef struct SQuery {
SArray* pPlaceholderValues; SArray* pPlaceholderValues;
SNode* pPrepareRoot; SNode* pPrepareRoot;
struct SParseMetaCache* pMetaCache; struct SParseMetaCache* pMetaCache;
bool stableQuery;
} SQuery; } SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
...@@ -241,6 +241,7 @@ typedef enum ELogicConditionType { ...@@ -241,6 +241,7 @@ typedef enum ELogicConditionType {
#define TSDB_USET_PASSWORD_LEN 129 #define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 12 #define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8 #define TSDB_LABEL_LEN 8
#define TSDB_JOB_STATUS_LEN 32
#define TSDB_CLUSTER_ID_LEN 40 #define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128 #define TSDB_FQDN_LEN 128
......
...@@ -45,7 +45,7 @@ extern "C" { ...@@ -45,7 +45,7 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 0 #define SYNC_ON_TOP_OF_ASYNC 1
enum { enum {
RES_TYPE__QUERY = 1, RES_TYPE__QUERY = 1,
...@@ -213,6 +213,7 @@ typedef struct SRequestObj { ...@@ -213,6 +213,7 @@ typedef struct SRequestObj {
SArray* tableList; SArray* tableList;
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
bool stableQuery;
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
...@@ -294,7 +295,7 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); ...@@ -294,7 +295,7 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType); uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen); SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
...@@ -305,6 +306,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra ...@@ -305,6 +306,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void taos_close_internal(void *taos);
// --- heartbeat // --- heartbeat
// global, called by mgmt // global, called by mgmt
int hbMgrInit(); int hbMgrInit();
......
...@@ -107,7 +107,7 @@ typedef struct STscStmt { ...@@ -107,7 +107,7 @@ typedef struct STscStmt {
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
TAOS_STMT *stmtInit(TAOS *taos); TAOS_STMT *stmtInit(STscObj* taos);
int stmtClose(TAOS_STMT *stmt); int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt); const char *stmtErrstr(TAOS_STMT *stmt);
......
...@@ -174,7 +174,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -174,7 +174,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
} }
if (pRsp->query->killConnection) { if (pRsp->query->killConnection) {
taos_close(pTscObj); taos_close_internal(pTscObj);
} }
if (pRsp->query->pQnodeList) { if (pRsp->query->pQnodeList) {
...@@ -310,11 +310,12 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -310,11 +310,12 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
} }
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql)); tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
desc.stime = pRequest->metric.start; desc.stime = pRequest->metric.start / 1000;
desc.queryId = pRequest->requestId; desc.queryId = pRequest->requestId;
desc.useconds = now - pRequest->metric.start; desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self; desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid; desc.pid = hbBasic->pid;
desc.stableQuery = pRequest->stableQuery;
taosGetFqdn(desc.fqdn); taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0; desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
...@@ -329,6 +330,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -329,6 +330,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
if (code) { if (code) {
taosArrayDestroy(desc.subDesc); taosArrayDestroy(desc.subDesc);
desc.subDesc = NULL; desc.subDesc = NULL;
desc.subPlanNum = 0;
} }
} else { } else {
desc.subDesc = NULL; desc.subDesc = NULL;
...@@ -350,19 +352,24 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -350,19 +352,24 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_QRY_APP_ERROR;
}
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic)); SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
if (NULL == hbBasic) { if (NULL == hbBasic) {
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic)); tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
req->query = hbBasic;
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_SUCCESS;
}
hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc)); hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
if (NULL == hbBasic->queryDesc) { if (NULL == hbBasic->queryDesc) {
...@@ -372,9 +379,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -372,9 +379,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
if (code) { if (code) {
......
...@@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType); SAppInstInfo* pAppInfo, int connType);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType) { uint16_t port, int connType) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
...@@ -697,6 +697,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -697,6 +697,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return pRequest; return pRequest;
} }
pRequest->stableQuery = pQuery->stableQuery;
return launchQueryImpl(pRequest, pQuery, false, NULL); return launchQueryImpl(pRequest, pQuery, false, NULL);
} }
...@@ -923,7 +925,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -923,7 +925,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
terrno = pRequest->code; terrno = pRequest->code;
destroyRequest(pRequest); destroyRequest(pRequest);
taos_close(pTscObj); taos_close_internal(pTscObj);
pTscObj = NULL; pTscObj = NULL;
} else { } else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
...@@ -958,8 +960,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -958,8 +960,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
taosMemoryFreeClear(db); taosMemoryFreeClear(db);
connectReq.connType = pObj->connType; connectReq.connType = pObj->connType;
connectReq.pid = htonl(appInfo.pid); connectReq.pid = appInfo.pid;
connectReq.startTime = htobe64(appInfo.startTime); connectReq.startTime = appInfo.startTime;
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user)); tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
...@@ -1088,7 +1090,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons ...@@ -1088,7 +1090,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return NULL; return NULL;
} }
return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
if (pObj) {
return (TAOS*)pObj->id;
}
return (TAOS*)0;
} }
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
......
...@@ -93,10 +93,15 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -93,10 +93,15 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS; pass = TSDB_DEFAULT_PASS;
} }
return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
return (TAOS*)pObj->id;
}
return (TAOS*)0;
} }
void taos_close(TAOS *taos) { void taos_close_internal(void *taos) {
if (taos == NULL) { if (taos == NULL) {
return; return;
} }
...@@ -107,6 +112,17 @@ void taos_close(TAOS *taos) { ...@@ -107,6 +112,17 @@ void taos_close(TAOS *taos) {
taosRemoveRef(clientConnRefPool, pTscObj->id); taosRemoveRef(clientConnRefPool, pTscObj->id);
} }
void taos_close(TAOS *taos) {
STscObj* pObj = acquireTscObj((int64_t)taos);
if (NULL == pObj) {
return;
}
taos_close_internal(pObj);
releaseTscObj((int64_t)taos);
}
int taos_errno(TAOS_RES *tres) { int taos_errno(TAOS_RES *tres) {
if (tres == NULL) { if (tres == NULL) {
return terrno; return terrno;
...@@ -186,29 +202,36 @@ static void syncQueryFn(void* param, void* res, int32_t code) { ...@@ -186,29 +202,36 @@ static void syncQueryFn(void* param, void* res, int32_t code) {
} }
TAOS_RES *taos_query(TAOS *taos, const char *sql) { TAOS_RES *taos_query(TAOS *taos, const char *sql) {
if (taos == NULL || sql == NULL) { STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
STscObj* pTscObj = (STscObj*)taos;
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param); taos_query_a(taos, sql, syncQueryFn, param);
tsem_wait(&param->sem); tsem_wait(&param->sem);
releaseTscObj((int64_t)taos);
return param->pRequest; return param->pRequest;
#else #else
size_t sqlLen = strlen(sql); size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj((int64_t)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL; return NULL;
} }
return execQuery(pTscObj, sql, sqlLen); TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
releaseTscObj((int64_t)taos);
return pRes;
#endif #endif
} }
...@@ -425,13 +448,15 @@ int taos_result_precision(TAOS_RES *res) { ...@@ -425,13 +448,15 @@ int taos_result_precision(TAOS_RES *res) {
} }
int taos_select_db(TAOS *taos, const char *db) { int taos_select_db(TAOS *taos, const char *db) {
STscObj *pObj = (STscObj *)taos; STscObj* pObj = acquireTscObj((int64_t)taos);
if (pObj == NULL) { if (pObj == NULL) {
releaseTscObj((int64_t)taos);
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED;
} }
if (db == NULL || strlen(db) == 0) { if (db == NULL || strlen(db) == 0) {
releaseTscObj((int64_t)taos);
terrno = TSDB_CODE_TSC_INVALID_INPUT; terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno; return terrno;
} }
...@@ -443,6 +468,7 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -443,6 +468,7 @@ int taos_select_db(TAOS *taos, const char *db) {
int32_t code = taos_errno(pRequest); int32_t code = taos_errno(pRequest);
taos_free_result(pRequest); taos_free_result(pRequest);
releaseTscObj((int64_t)taos);
return code; return code;
} }
...@@ -589,19 +615,26 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { ...@@ -589,19 +615,26 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; } int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) { void taos_reset_current_db(TAOS *taos) {
if (taos == NULL) { STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return; return;
} }
resetConnectDB(taos); resetConnectDB(pTscObj);
releaseTscObj((int64_t)taos);
} }
const char *taos_get_server_info(TAOS *taos) { const char *taos_get_server_info(TAOS *taos) {
if (taos == NULL) { STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
STscObj *pTscObj = (STscObj *)taos; releaseTscObj((int64_t)taos);
return pTscObj->ver; return pTscObj->ver;
} }
...@@ -619,6 +652,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { ...@@ -619,6 +652,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -652,10 +686,14 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { ...@@ -652,10 +686,14 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
} }
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
ASSERT(fp != NULL); STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
if (taos == NULL || sql == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj((int64_t)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno); fp(param, NULL, terrno);
return; return;
} }
...@@ -670,7 +708,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param ...@@ -670,7 +708,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
} }
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest); int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
fp(param, NULL, terrno); fp(param, NULL, terrno);
...@@ -866,13 +904,18 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -866,13 +904,18 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
} }
TAOS_STMT *taos_stmt_init(TAOS *taos) { TAOS_STMT *taos_stmt_init(TAOS *taos) {
if (taos == NULL) { STscObj* pObj = acquireTscObj((int64_t)taos);
tscError("NULL parameter for %s", __FUNCTION__); if (NULL == pObj) {
terrno = TSDB_CODE_INVALID_PARA; tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
return stmtInit(taos); TAOS_STMT* pStmt = stmtInit(pObj);
releaseTscObj((int64_t)taos);
return pStmt;
} }
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
......
...@@ -1460,7 +1460,7 @@ static void smlDestroyInfo(SSmlHandle* info){ ...@@ -1460,7 +1460,7 @@ static void smlDestroyInfo(SSmlHandle* info){
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision){ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) { if (NULL == info) {
...@@ -1483,7 +1483,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol ...@@ -1483,7 +1483,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
} }
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos; info->taos = pTscObj;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code); uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
...@@ -2447,13 +2447,21 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { ...@@ -2447,13 +2447,21 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
*/ */
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
uError("SML:taos_schemaless_insert invalid taos");
return NULL;
}
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!request){ if(!request){
releaseTscObj((int64_t)taos);
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
return NULL; return NULL;
} }
((STscObj *)taos)->schemalessType = 1; pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
int cnt = ceil(((double)numLines)/LINE_BATCH); int cnt = ceil(((double)numLines)/LINE_BATCH);
...@@ -2468,7 +2476,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2468,7 +2476,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
if(isSchemalessDb(((STscObj *)taos), request) != TSDB_CODE_SUCCESS){ if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF; request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end; goto end;
...@@ -2493,13 +2501,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2493,13 +2501,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
} }
for (int i = 0; i < cnt; ++i) { for (int i = 0; i < cnt; ++i) {
SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT); SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!req){ if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
goto end; goto end;
} }
SSmlHandle* info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision); SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
if(!info){ if(!info){
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null"); uError("SML:taos_schemaless_insert error SSmlHandle is null");
...@@ -2533,8 +2541,9 @@ end: ...@@ -2533,8 +2541,9 @@ end:
taosThreadSpinDestroy(&params.lock); taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem); tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0; // ((STscObj *)taos)->schemalessType = 0;
((STscObj *)taos)->schemalessType = 1; pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf); uDebug("resultend:%s", request->msgBuf);
releaseTscObj((int64_t)taos);
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
...@@ -478,7 +478,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) { ...@@ -478,7 +478,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
TAOS_STMT* stmtInit(TAOS* taos) { TAOS_STMT* stmtInit(STscObj* taos) {
STscObj* pObj = (STscObj*)taos; STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL; STscStmt* pStmt = NULL;
......
...@@ -253,8 +253,8 @@ static const SSysTableMeta infosMeta[] = { ...@@ -253,8 +253,8 @@ static const SSysTableMeta infosMeta[] = {
static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema connectionsSchema[] = {
{.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "program", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
...@@ -298,19 +298,18 @@ static const SSysDbTableSchema offsetSchema[] = { ...@@ -298,19 +298,18 @@ static const SSysDbTableSchema offsetSchema[] = {
}; };
static const SSysDbTableSchema querySchema[] = { static const SSysDbTableSchema querySchema[] = {
{.name = "query_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "query_id", .bytes = 26 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, {.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "qid", .bytes = 22 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "time", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "sql_obj_id", .bytes = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "sub_queries", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "sub_query_info", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
......
...@@ -210,6 +210,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR ...@@ -210,6 +210,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI32(pEncoder, desc->pid) < 0) return -1; if (tEncodeI32(pEncoder, desc->pid) < 0) return -1;
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
...@@ -218,7 +219,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR ...@@ -218,7 +219,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
for (int32_t m = 0; m < snum; ++m) { for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1;
if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1; if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1;
} }
} }
} else { } else {
...@@ -265,6 +266,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) ...@@ -265,6 +266,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1; if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&desc.stableQuery) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
...@@ -277,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) ...@@ -277,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
for (int32_t m = 0; m < snum; ++m) { for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc sDesc = {0}; SQuerySubDesc sDesc = {0};
if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1;
if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1; if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1;
taosArrayPush(desc.subDesc, &sDesc); taosArrayPush(desc.subDesc, &sDesc);
} }
} }
......
...@@ -39,6 +39,7 @@ typedef struct { ...@@ -39,6 +39,7 @@ typedef struct {
int64_t lastAccessTimeMs; int64_t lastAccessTimeMs;
uint64_t killId; uint64_t killId;
int32_t numOfQueries; int32_t numOfQueries;
SRWLatch queryLock;
SArray *pQueries; // SArray<SQueryDesc> SArray *pQueries; // SArray<SQueryDesc>
} SConnObj; } SConnObj;
...@@ -53,8 +54,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq); ...@@ -53,8 +54,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t mndProcessConnectReq(SRpcMsg *pReq); static int32_t mndProcessConnectReq(SRpcMsg *pReq);
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq); static int32_t mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t mndProcessKillConnReq(SRpcMsg *pReq); static int32_t mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) { int32_t mndInitProfile(SMnode *pMnode) {
...@@ -74,7 +75,7 @@ int32_t mndInitProfile(SMnode *pMnode) { ...@@ -74,7 +75,7 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
...@@ -129,7 +130,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType ...@@ -129,7 +130,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
} }
static void mndFreeConn(SConnObj *pConn) { static void mndFreeConn(SConnObj *pConn) {
taosWLockLatch(&pConn->queryLock);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
taosWUnLockLatch(&pConn->queryLock);
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
} }
...@@ -222,8 +225,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -222,8 +225,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
goto CONN_OVER; goto CONN_OVER;
} }
mndAcquireConn(pMnode, pConn->id);
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
connectRsp.acctId = pUser->acctId; connectRsp.acctId = pUser->acctId;
connectRsp.superUser = pUser->superUser; connectRsp.superUser = pUser->superUser;
...@@ -259,12 +260,17 @@ CONN_OVER: ...@@ -259,12 +260,17 @@ CONN_OVER:
} }
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
taosWLockLatch(&pConn->queryLock);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
pConn->pQueries = pBasic->queryDesc; pConn->pQueries = pBasic->queryDesc;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -353,14 +359,10 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -353,14 +359,10 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
return -1; return -1;
} else { } else {
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
pConn = mndAcquireConn(pMnode, pBasic->connId);
} }
} else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
} }
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) { if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
...@@ -389,6 +391,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -389,6 +391,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic; hbRsp.query = rspBasic;
} else {
mDebug("no query info in hb msg");
} }
int32_t kvNum = taosHashGetSize(pHbReq->info); int32_t kvNum = taosHashGetSize(pHbReq->info);
...@@ -559,14 +563,13 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { ...@@ -559,14 +563,13 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
} }
} }
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t numOfRows = 0; SSdb *pSdb = pMnode->pSdb;
SConnObj *pConn = NULL; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; SConnObj *pConn = NULL;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
if (pShow->pIter == NULL) { if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache); pShow->pIter = taosCacheCreateIter(pMgmt->cache);
...@@ -574,48 +577,45 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int3 ...@@ -574,48 +577,45 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int3
while (numOfRows < rows) { while (numOfRows < rows) {
pConn = mndGetNextConn(pMnode, pShow->pIter); pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) break; if (pConn == NULL) {
pShow->pIter = NULL;
break;
}
cols = 0; cols = 0;
#if 0
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
*(uint32_t *)pWrite = pConn->id; colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
cols++;
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; STR_TO_VARSTR(user, pConn->user);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
cols++; colDataAppend(pColInfo, numOfRows, (const char *)user, false);
// app name char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; STR_TO_VARSTR(app, pConn->app);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
cols++; colDataAppend(pColInfo, numOfRows, (const char *)app, false);
// app pid pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
*(int32_t *)pWrite = pConn->pid;
cols++; char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
taosIpPort2String(pConn->ip, pConn->port, ipStr); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes); colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows; colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
*(int64_t *)pWrite = pConn->loginTimeMs;
cols++; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
*(int64_t *)pWrite = pConn->lastAccessTimeMs;
cols++;
#endif
numOfRows++; numOfRows++;
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
} }
...@@ -633,9 +633,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -633,9 +633,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
while (numOfRows < rows) { while (numOfRows < rows) {
pConn = mndGetNextConn(pMnode, pShow->pIter); pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) break; if (pConn == NULL) {
pShow->pIter = NULL;
break;
}
taosRLockLatch(&pConn->queryLock);
if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) { if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
taosRUnLockLatch(&pConn->queryLock);
continue; continue;
} }
...@@ -644,14 +649,28 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -644,14 +649,28 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i); SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0; cols = 0;
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false); colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(app, pConn->app);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false);
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(b1, pConn->user); STR_TO_VARSTR(user, pConn->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)user, false); colDataAppend(pColInfo, numOfRows, (const char *)user, false);
...@@ -661,23 +680,41 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -661,23 +680,41 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false); colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
char b1[9] = {0};
STR_TO_VARSTR(b1, online ? "ready" : "offline");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0}; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b, false); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t strSize = sizeof(subStatus);
int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
}
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, subStatus, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(sql, pQuery->sql);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pDnode);
} }
taosRUnLockLatch(&pConn->queryLock);
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
......
...@@ -468,8 +468,8 @@ int32_t udfdConnectToMnode() { ...@@ -468,8 +468,8 @@ int32_t udfdConnectToMnode() {
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId()); connReq.pid = taosGetPId();
connReq.startTime = htobe64(taosGetTimestampMs()); connReq.startTime = taosGetTimestampMs();
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void * pReq = rpcMallocCont(contLen); void * pReq = rpcMallocCont(contLen);
......
...@@ -199,6 +199,7 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -199,6 +199,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_SUBSCRIBES_STMT: case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT: case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT: case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT: case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT: case QUERY_NODE_SHOW_APPS_STMT:
......
...@@ -383,6 +383,16 @@ static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt* ...@@ -383,6 +383,16 @@ static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt*
pCxt->pMetaCache); pCxt->pMetaCache);
} }
static int32_t collectMetaKeyFromShowConnections(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONNECTIONS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowQueries(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_QUERIES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS, return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS,
pCxt->pMetaCache); pCxt->pMetaCache);
...@@ -453,6 +463,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -453,6 +463,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
return collectMetaKeyFromShowConnections(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_QUERIES_STMT:
return collectMetaKeyFromShowQueries(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
......
...@@ -42,6 +42,7 @@ typedef struct STranslateContext { ...@@ -42,6 +42,7 @@ typedef struct STranslateContext {
SExplainOptions* pExplainOpt; SExplainOptions* pExplainOpt;
SParseMetaCache* pMetaCache; SParseMetaCache* pMetaCache;
bool createStream; bool createStream;
bool stableQuery;
} STranslateContext; } STranslateContext;
typedef struct SFullDatabaseName { typedef struct SFullDatabaseName {
...@@ -1478,6 +1479,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { ...@@ -1478,6 +1479,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addNamespace(pCxt, pRealTable); code = addNamespace(pCxt, pRealTable);
} }
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
pCxt->stableQuery = true;
}
break; break;
} }
case QUERY_NODE_TEMP_TABLE: { case QUERY_NODE_TEMP_TABLE: {
...@@ -4054,12 +4058,14 @@ static const char* getSysDbName(ENodeType type) { ...@@ -4054,12 +4058,14 @@ static const char* getSysDbName(ENodeType type) {
case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
return TSDB_INFORMATION_SCHEMA_DB; return TSDB_INFORMATION_SCHEMA_DB;
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_STREAMS_STMT: case QUERY_NODE_SHOW_STREAMS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
return TSDB_PERFORMANCE_SCHEMA_DB; return TSDB_PERFORMANCE_SCHEMA_DB;
default: default:
break; break;
...@@ -4109,6 +4115,10 @@ static const char* getSysTableName(ENodeType type) { ...@@ -4109,6 +4115,10 @@ static const char* getSysTableName(ENodeType type) {
return TSDB_PERFS_TABLE_TOPICS; return TSDB_PERFS_TABLE_TOPICS;
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return TSDB_PERFS_TABLE_TRANS; return TSDB_PERFS_TABLE_TRANS;
case QUERY_NODE_SHOW_VARIABLE_STMT:
return TSDB_INS_TABLE_CONFIGS;
case QUERY_NODE_SHOW_APPS_STMT:
return TSDB_PERFS_TABLE_APPS;
default: default:
break; break;
} }
...@@ -4714,6 +4724,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl ...@@ -4714,6 +4724,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) { if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto over;
} }
*pIsSuperTable = false; *pIsSuperTable = false;
...@@ -4807,7 +4818,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4807,7 +4818,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
} }
} }
if (isSuperTable) { if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) {
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5152,6 +5163,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5152,6 +5163,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
code = rewriteShow(pCxt, pQuery); code = rewriteShow(pCxt, pQuery);
break; break;
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:
...@@ -5248,6 +5261,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5248,6 +5261,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break; break;
} }
pQuery->stableQuery = pCxt->stableQuery;
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) { if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -151,7 +151,9 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { ...@@ -151,7 +151,9 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
for (int32_t m = 0; m < pLevel->taskNum; ++m) { for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; SQuerySubDesc subDesc = {0};
subDesc.tid = pTask->taskId;
strcpy(subDesc.status, jobTaskStatusStr(pTask->status));
taosArrayPush(pSub, &subDesc); taosArrayPush(pSub, &subDesc);
} }
......
...@@ -931,15 +931,15 @@ bool taosCacheIterNext(SCacheIter *pIter) { ...@@ -931,15 +931,15 @@ bool taosCacheIterNext(SCacheIter *pIter) {
SCacheObj *pCacheObj = pIter->pCacheObj; SCacheObj *pCacheObj = pIter->pCacheObj;
if (pIter->index + 1 >= pIter->numOfObj) { if (pIter->index + 1 >= pIter->numOfObj) {
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
return false;
}
// release the reference for all objects in the snapshot // release the reference for all objects in the snapshot
for (int32_t i = 0; i < pIter->numOfObj; ++i) { for (int32_t i = 0; i < pIter->numOfObj; ++i) {
char *p = pIter->pCurrent[i]->data; char *p = pIter->pCurrent[i]->data;
taosCacheRelease(pCacheObj, (void **)&p, false); taosCacheRelease(pCacheObj, (void **)&p, false);
pIter->pCurrent[i] = NULL; pIter->pCurrent[i] = NULL;
}
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
return false;
} }
while (1) { while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册