提交 4a9738ef 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/dnode

......@@ -2001,16 +2001,17 @@ typedef struct {
typedef struct {
int64_t tid;
int32_t status;
char status[TSDB_JOB_STATUS_LEN];
} SQuerySubDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId;
int64_t useconds;
int64_t stime;
int64_t stime; // timestamp precision ms
int64_t reqRid;
int32_t pid;
bool stableQuery;
char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc>
......
......@@ -351,6 +351,7 @@ typedef struct SQuery {
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pPrepareRoot;
bool stableQuery;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
......@@ -243,6 +243,7 @@ typedef enum ELogicConditionType {
#define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8
#define TSDB_JOB_STATUS_LEN 32
#define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128
......
......@@ -213,6 +213,7 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery;
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
uint32_t retry;
......@@ -294,7 +295,7 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
......@@ -305,6 +306,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void taos_close_internal(void *taos);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
......
......@@ -107,7 +107,7 @@ typedef struct STscStmt {
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
TAOS_STMT *stmtInit(TAOS *taos);
TAOS_STMT *stmtInit(STscObj* taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt);
......
......@@ -174,7 +174,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
if (pRsp->query->killConnection) {
taos_close(pTscObj);
taos_close_internal(pTscObj);
}
if (pRsp->query->pQnodeList) {
......@@ -310,11 +310,12 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
}
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
desc.stime = pRequest->metric.start;
desc.stime = pRequest->metric.start / 1000;
desc.queryId = pRequest->requestId;
desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid;
desc.stableQuery = pRequest->stableQuery;
taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
......@@ -329,6 +330,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
if (code) {
taosArrayDestroy(desc.subDesc);
desc.subDesc = NULL;
desc.subPlanNum = 0;
}
} else {
desc.subDesc = NULL;
......@@ -350,19 +352,24 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_QRY_APP_ERROR;
}
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
if (NULL == hbBasic) {
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
releaseTscObj(connKey->tscRid);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
req->query = hbBasic;
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_SUCCESS;
}
hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
if (NULL == hbBasic->queryDesc) {
......@@ -372,9 +379,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
if (code) {
......
......@@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType) {
if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -692,6 +692,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
return pRequest;
}
pRequest->stableQuery = pQuery->stableQuery;
return launchQueryImpl(pRequest, pQuery, false, NULL);
}
......@@ -917,7 +919,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
terrno = pRequest->code;
destroyRequest(pRequest);
taos_close(pTscObj);
taos_close_internal(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
......@@ -952,8 +954,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
taosMemoryFreeClear(db);
connectReq.connType = pObj->connType;
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
connectReq.pid = appInfo.pid;
connectReq.startTime = appInfo.startTime;
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
......@@ -1081,7 +1083,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return NULL;
}
return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
if (pObj) {
return (TAOS*)pObj->id;
}
return (TAOS*)0;
}
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
......
......@@ -97,10 +97,15 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS;
}
return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
return (TAOS*)pObj->id;
}
return (TAOS*)0;
}
void taos_close(TAOS *taos) {
void taos_close_internal(void *taos) {
if (taos == NULL) {
return;
}
......@@ -111,6 +116,17 @@ void taos_close(TAOS *taos) {
taosRemoveRef(clientConnRefPool, pTscObj->id);
}
void taos_close(TAOS *taos) {
STscObj* pObj = acquireTscObj((int64_t)taos);
if (NULL == pObj) {
return;
}
taos_close_internal(pObj);
releaseTscObj((int64_t)taos);
}
int taos_errno(TAOS_RES *tres) {
if (tres == NULL) {
return terrno;
......@@ -190,29 +206,36 @@ static void syncQueryFn(void *param, void *res, int32_t code) {
}
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
if (taos == NULL || sql == NULL) {
STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj *pTscObj = (STscObj *)taos;
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param);
taos_query_a(taos, sql, syncQueryFn, param);
tsem_wait(&param->sem);
releaseTscObj((int64_t)taos);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj((int64_t)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
return execQuery(pTscObj, sql, sqlLen);
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
releaseTscObj((int64_t)taos);
return pRes;
#endif
}
......@@ -429,13 +452,15 @@ int taos_result_precision(TAOS_RES *res) {
}
int taos_select_db(TAOS *taos, const char *db) {
STscObj *pObj = (STscObj *)taos;
STscObj* pObj = acquireTscObj((int64_t)taos);
if (pObj == NULL) {
releaseTscObj((int64_t)taos);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED;
}
if (db == NULL || strlen(db) == 0) {
releaseTscObj((int64_t)taos);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
......@@ -447,6 +472,7 @@ int taos_select_db(TAOS *taos, const char *db) {
int32_t code = taos_errno(pRequest);
taos_free_result(pRequest);
releaseTscObj((int64_t)taos);
return code;
}
......@@ -593,19 +619,26 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) {
if (taos == NULL) {
STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return;
}
resetConnectDB(taos);
resetConnectDB(pTscObj);
releaseTscObj((int64_t)taos);
}
const char *taos_get_server_info(TAOS *taos) {
if (taos == NULL) {
STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj *pTscObj = (STscObj *)taos;
releaseTscObj((int64_t)taos);
return pTscObj->ver;
}
......@@ -637,6 +670,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
}
if (code == TSDB_CODE_SUCCESS) {
......@@ -670,10 +704,14 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
ASSERT(fp != NULL);
if (taos == NULL || sql == NULL) {
STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj((int64_t)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno);
return;
}
......@@ -688,7 +726,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
......@@ -888,13 +926,18 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
TAOS_STMT *taos_stmt_init(TAOS *taos) {
if (taos == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
STscObj* pObj = acquireTscObj((int64_t)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
return stmtInit(taos);
TAOS_STMT* pStmt = stmtInit(pObj);
releaseTscObj((int64_t)taos);
return pStmt;
}
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
......
......@@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_TAG: {
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
pos--;
++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
TAOS_RES *res = taos_query(info->taos, result);
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result);
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -1453,9 +1453,9 @@ static void smlDestroyInfo(SSmlHandle *info) {
taosMemoryFreeClear(info);
}
static SSmlHandle *smlBuildSmlInfo(TAOS *taos, SRequestObj *request, SMLProtocolType protocol, int8_t precision) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
......@@ -1476,7 +1476,7 @@ static SSmlHandle *smlBuildSmlInfo(TAOS *taos, SRequestObj *request, SMLProtocol
}
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos;
info->taos = pTscObj;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
......@@ -2433,14 +2433,22 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
*
*/
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
if (!request) {
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
STscObj* pTscObj = acquireTscObj((int64_t)taos);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
uError("SML:taos_schemaless_insert invalid taos");
return NULL;
}
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!request){
releaseTscObj((int64_t)taos);
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
((STscObj *)taos)->schemalessType = 1;
pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
int cnt = ceil(((double)numLines) / LINE_BATCH);
......@@ -2455,7 +2463,7 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
goto end;
}
if (isSchemalessDb(((STscObj *)taos), request) != TSDB_CODE_SUCCESS) {
if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end;
......@@ -2481,14 +2489,14 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
}
for (int i = 0; i < cnt; ++i) {
SRequestObj *req = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
if (!req) {
SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
goto end;
}
SSmlHandle *info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision);
if (!info) {
SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
if(!info){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
......@@ -2520,8 +2528,9 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
end:
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0;
((STscObj *)taos)->schemalessType = 1;
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
return (TAOS_RES *)request;
releaseTscObj((int64_t)taos);
return (TAOS_RES*)request;
}
......@@ -478,7 +478,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
TAOS_STMT* stmtInit(TAOS* taos) {
TAOS_STMT* stmtInit(STscObj* taos) {
STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL;
......
......@@ -257,8 +257,8 @@ static const SSysTableMeta infosMeta[] = {
static const SSysDbTableSchema connectionsSchema[] = {
{.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "program", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......@@ -302,19 +302,18 @@ static const SSysDbTableSchema offsetSchema[] = {
};
static const SSysDbTableSchema querySchema[] = {
{.name = "query_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "query_id", .bytes = 26 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "qid", .bytes = 22 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "time", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "sql_obj_id", .bytes = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "sub_queries", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "sub_query_info", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
......
......@@ -210,6 +210,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI32(pEncoder, desc->pid) < 0) return -1;
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
......@@ -218,7 +219,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1;
if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1;
if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1;
}
}
} else {
......@@ -265,6 +266,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&desc.stableQuery) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
......@@ -277,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc sDesc = {0};
if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1;
if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1;
if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1;
taosArrayPush(desc.subDesc, &sDesc);
}
}
......
......@@ -39,6 +39,7 @@ typedef struct {
int64_t lastAccessTimeMs;
uint64_t killId;
int32_t numOfQueries;
SRWLatch queryLock;
SArray *pQueries; // SArray<SQueryDesc>
} SConnObj;
......@@ -53,8 +54,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t mndProcessConnectReq(SRpcMsg *pReq);
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) {
......@@ -74,9 +75,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
return 0;
......@@ -129,7 +130,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
}
static void mndFreeConn(SConnObj *pConn) {
taosWLockLatch(&pConn->queryLock);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
taosWUnLockLatch(&pConn->queryLock);
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
}
......@@ -222,8 +225,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
goto CONN_OVER;
}
mndAcquireConn(pMnode, pConn->id);
SConnectRsp connectRsp = {0};
connectRsp.acctId = pUser->acctId;
connectRsp.superUser = pUser->superUser;
......@@ -259,12 +260,17 @@ CONN_OVER:
}
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
taosWLockLatch(&pConn->queryLock);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
pConn->pQueries = pBasic->queryDesc;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock);
return TSDB_CODE_SUCCESS;
}
......@@ -354,13 +360,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} else {
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
}
} else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
}
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn);
......@@ -389,6 +390,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic;
} else {
mDebug("no query info in hb msg");
}
int32_t kvNum = taosHashGetSize(pHbReq->info);
......@@ -559,14 +562,13 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
}
}
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->info.node;
int32_t numOfRows = 0;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
......@@ -574,61 +576,55 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int3
while (numOfRows < rows) {
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) break;
if (pConn == NULL) {
pShow->pIter = NULL;
break;
}
cols = 0;
#if 0
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(uint32_t *)pWrite = pConn->id;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
cols++;
// app name
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
cols++;
// app pid
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int32_t *)pWrite = pConn->pid;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
taosIpPort2String(pConn->ip, pConn->port, ipStr);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int64_t *)pWrite = pConn->loginTimeMs;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
*(int64_t *)pWrite = pConn->lastAccessTimeMs;
cols++;
#endif
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConn->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)user, false);
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(app, pConn->app);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
numOfRows++;
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->info.node;
int32_t numOfRows = 0;
#if 0
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
void *pIter;
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
......@@ -641,85 +637,86 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, in
break;
}
if (numOfRows + pConn->numOfQueries >= rows) {
taosCacheDestroyIter(pShow->pIter);
pShow->pIter = NULL;
break;
taosRLockLatch(&pConn->queryLock);
if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
taosRUnLockLatch(&pConn->queryLock);
continue;
}
for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
SQueryDesc *pDesc = pConn->pQueries + i;
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
for (int32_t i = 0; i < numOfQueries; ++i) {
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->queryId);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int64_t *)pWrite = htobe64(pConn->id);
cols++;
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
char handleBuf[24] = {0};
snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(app, pConn->app);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->stime);
cols++;
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConn->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)user, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->useconds);
cols++;
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int32_t *)pWrite = htonl(pDesc->pid);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
char epBuf[TSDB_EP_LEN + 1] = {0};
snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(bool *)pWrite = pDesc->stableQuery;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
*(int32_t *)pWrite = htonl(pDesc->numOfSub);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
cols++;
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t strSize = sizeof(subStatus);
int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
}
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, subStatus, false);
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
cols++;
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(sql, pQuery->sql);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
numOfRows++;
}
taosRUnLockLatch(&pConn->queryLock);
}
pShow->numOfRows += numOfRows;
#endif
return numOfRows;
}
......
......@@ -1284,7 +1284,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......
......@@ -88,7 +88,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
test.SendShowReq(TSDB_MGMT_TABLE_CONNS, "connections", "");
EXPECT_EQ(test.GetShowRows(), 0);
EXPECT_EQ(test.GetShowRows(), 1);
}
TEST_F(MndTestProfile, 04_HeartBeatMsg) {
......
......@@ -468,8 +468,8 @@ int32_t udfdConnectToMnode() {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
connReq.pid = taosGetPId();
connReq.startTime = taosGetTimestampMs();
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void * pReq = rpcMallocCont(contLen);
......
......@@ -199,6 +199,7 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
......
......@@ -377,6 +377,16 @@ static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt*
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowConnections(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONNECTIONS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowQueries(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_QUERIES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS,
pCxt->pMetaCache);
......@@ -447,6 +457,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TOPICS_STMT:
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
return collectMetaKeyFromShowConnections(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_QUERIES_STMT:
return collectMetaKeyFromShowQueries(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT:
......
......@@ -42,6 +42,7 @@ typedef struct STranslateContext {
SExplainOptions* pExplainOpt;
SParseMetaCache* pMetaCache;
bool createStream;
bool stableQuery;
} STranslateContext;
typedef struct SFullDatabaseName {
......@@ -1508,6 +1509,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
if (TSDB_CODE_SUCCESS == code) {
code = addNamespace(pCxt, pRealTable);
}
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
pCxt->stableQuery = true;
}
break;
}
case QUERY_NODE_TEMP_TABLE: {
......@@ -4805,6 +4809,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) {
code = TSDB_CODE_SUCCESS;
goto over;
}
*pIsSuperTable = false;
......@@ -4898,7 +4903,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
}
}
if (isSuperTable) {
if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
......@@ -5343,6 +5348,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break;
}
pQuery->stableQuery = pCxt->stableQuery;
if (pQuery->haveResultSet) {
if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -151,7 +151,9 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
SQuerySubDesc subDesc = {0};
subDesc.tid = pTask->taskId;
strcpy(subDesc.status, jobTaskStatusStr(pTask->status));
taosArrayPush(pSub, &subDesc);
}
......
......@@ -931,15 +931,15 @@ bool taosCacheIterNext(SCacheIter *pIter) {
SCacheObj *pCacheObj = pIter->pCacheObj;
if (pIter->index + 1 >= pIter->numOfObj) {
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
return false;
}
// release the reference for all objects in the snapshot
for (int32_t i = 0; i < pIter->numOfObj; ++i) {
char *p = pIter->pCurrent[i]->data;
taosCacheRelease(pCacheObj, (void **)&p, false);
pIter->pCurrent[i] = NULL;
}
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
return false;
}
while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册