提交 8d5f5901 编写于 作者: S Shengliang Guan

TD-10431 profile test

上级 c0fc32f6
......@@ -418,10 +418,10 @@ typedef struct {
} SDropSTableMsg;
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1];
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1];
} SColIndex;
typedef struct SColumnFilterInfo {
......@@ -515,8 +515,8 @@ typedef struct {
int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval;
// SSessionWindow sw; // session window
uint16_t tagCondLen; // tag length in current query
uint16_t colCondLen; // column length in current query
int16_t tagCondLen; // tag length in current query
int16_t colCondLen; // column length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
......@@ -524,10 +524,10 @@ typedef struct {
int16_t prjOrder; // global order in super table projection query.
int64_t limit;
int64_t offset;
uint32_t queryType; // denote another query process
int32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list
int64_t fillVal; // default value array list
int32_t secondStageOutput;
STsBufInfo tsBuf; // tsBuf info
int32_t numOfTags; // number of tags columns involved
......@@ -542,8 +542,11 @@ typedef struct {
} SQueryTableMsg;
typedef struct {
int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle
int32_t code;
union {
uint64_t qhandle;
uint64_t qId;
}; // query handle
} SQueryTableRsp;
// todo: the show handle should be replaced with id
......@@ -706,7 +709,7 @@ typedef struct {
typedef struct {
char db[TSDB_FULL_DB_NAME_LEN];
uint32_t vgId;
int32_t vgId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
......@@ -784,14 +787,14 @@ typedef struct STableMetaMsg {
} STableMetaMsg;
typedef struct SMultiTableMeta {
int32_t numOfTables;
int32_t numOfVgroup;
int32_t numOfUdf;
int32_t contLen;
uint8_t compressed; // denote if compressed or not
uint32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[];
int32_t numOfTables;
int32_t numOfVgroup;
int32_t numOfUdf;
int32_t contLen;
int8_t compressed; // denote if compressed or not
int32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[];
} SMultiTableMeta;
typedef struct {
......@@ -841,8 +844,8 @@ typedef struct {
} SConfigTableMsg;
typedef struct {
uint32_t dnodeId;
int32_t vgId;
int32_t dnodeId;
int32_t vgId;
} SConfigVnodeMsg;
typedef struct {
......@@ -851,29 +854,29 @@ typedef struct {
} SCfgDnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint32_t queryId;
int64_t useconds;
int64_t stime;
uint64_t qId;
uint64_t sqlObjId;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
uint8_t stableQuery;
int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
char sql[TSDB_SHOW_SQL_LEN];
int32_t queryId;
int64_t useconds;
int64_t stime;
int64_t qId;
int64_t sqlObjId;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
int8_t stableQuery;
int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
uint32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
int64_t ctime;
int64_t stime;
int64_t slidingTime;
int64_t interval;
char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
int32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
int64_t ctime;
int64_t stime;
int64_t slidingTime;
int64_t interval;
} SStreamDesc;
typedef struct {
......
......@@ -215,7 +215,7 @@ TEST_F(DndTestProfile, SConnectMsg_03) {
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_NE(pMsg->code, 0);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
......@@ -310,7 +310,7 @@ TEST_F(DndTestProfile, SKillConnMsg_01) {
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_TSC_INVALID_CONNECTION);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONNECTION);
ASSERT_EQ(pMsg->contLen, 0);
}
......@@ -338,7 +338,7 @@ TEST_F(DndTestProfile, SKillConnMsg_01) {
EXPECT_EQ(pRsp->acctId, 1);
EXPECT_GT(pRsp->clusterId, 0);
EXPECT_EQ(pRsp->superAuth, 1);
EXPECT_GT(pRsp->connId, connId);
EXPECT_EQ(pRsp->readAuth, 1);
EXPECT_EQ(pRsp->writeAuth, 1);
......@@ -446,6 +446,115 @@ TEST_F(DndTestProfile, SKillQueryMsg_02) {
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID);
}
TEST_F(DndTestProfile, SKillQueryMsg_03) {
ASSERT_NE(pClient, nullptr);
int32_t showId = 0;
{
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_QUERIES;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->showId = htonl(pRsp->showId);
STableMetaMsg* pMeta = &pRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId;
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 14);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "queryId");
pSchema = &pMeta->schema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->schema[3];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port");
}
{
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 0);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
}
}
TEST_F(DndTestProfile, SKillStreamMsg_01) {
ASSERT_NE(pClient, nullptr);
......@@ -523,3 +632,105 @@ TEST_F(DndTestProfile, SKillStreamMsg_02) {
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID);
}
TEST_F(DndTestProfile, SKillStreamMsg_03) {
ASSERT_NE(pClient, nullptr);
int32_t showId = 0;
{
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_STREAMS;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->showId = htonl(pRsp->showId);
STableMetaMsg* pMeta = &pRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId;
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 10);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->schema[0];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "streamId");
pSchema = &pMeta->schema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->schema[2];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
}
{
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 0);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
}
}
......@@ -29,7 +29,7 @@ typedef struct {
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int32_t pid; // pid of app that invokes taosc
int32_t connId;
int32_t id;
int8_t killed;
int8_t align;
uint16_t port;
......@@ -46,7 +46,7 @@ typedef struct {
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app);
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *user, uint32_t ip, uint16_t port);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
......@@ -57,11 +57,11 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg);
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow);
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow);
static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg);
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) {
......@@ -109,7 +109,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t
if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
SConnObj connObj = {.pid = pid,
.connId = connId,
.id = connId,
.killed = 0,
.port = port,
.ip = ip,
......@@ -130,10 +130,10 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
mError("conn:%d, data:%p failed to put into cache since %s, user:%s", connId, pConn, user, terrstr());
return NULL;
} else {
mDebug("conn:%d, is created, user:%s", connId, user);
mTrace("conn:%d, data:%p created, user:%s", pConn->id, pConn, user);
return pConn;
}
}
......@@ -141,39 +141,29 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t
static void mndFreeConn(SConnObj *pConn) {
tfree(pConn->pQueries);
tfree(pConn->pStreams);
mDebug("conn:%d, is destroyed", pConn->connId);
mTrace("conn:%d, data:%p destroyed", pConn->id, pConn);
}
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *newUser, uint32_t newIp, uint16_t newPort) {
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mDebug("conn:%d, already destroyed, user:%s", connId, newUser);
return NULL;
}
if (pConn->ip != newIp || pConn->port != newPort /* || strcmp(pConn->user, newUser) != 0 */) {
char oldIpStr[30];
char newIpStr[30];
taosIp2String(pConn->ip, oldIpStr);
taosIp2String(newIp, newIpStr);
mDebug("conn:%d, incoming conn user:%s ip:%s:%u, not match exist user:%s ip:%s:%u", connId, newUser, newIpStr,
newPort, pConn->user, oldIpStr, pConn->port);
if (pMgmt->connId < connId) pMgmt->connId = connId + 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
mDebug("conn:%d, already destroyed", connId);
return NULL;
}
int32_t keepTime = pMnode->shellActivityTimer * 3;
pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn);
return pConn;
}
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
if (pConn == NULL) return;
mTrace("conn:%d, data:%p released from cache", pConn->id, pConn);
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
}
......@@ -250,13 +240,13 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
}
pRsp->clusterId = htonl(pMnode->clusterId);
pRsp->connId = htonl(pConn->connId);
pRsp->connId = htonl(pConn->id);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn);
pMsg->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->connId);
mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->id);
return 0;
}
......@@ -296,7 +286,9 @@ static int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
}
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont;
pReq->connId = htonl(pReq->connId);
pReq->pid = htonl(pReq->pid);
......@@ -307,20 +299,33 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
return -1;
}
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId, info.user, info.clientIp, info.clientPort);
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app);
if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr());
return -1;
} else {
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->connId);
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->id);
}
} else if (pConn->killed) {
mDebug("user:%s, conn:%d is already killed", pMsg->user, pReq->connId, pConn->connId);
terrno = TSDB_CODE_TSC_INVALID_CONNECTION;
mError("user:%s, conn:%d is already killed", pMsg->user, pConn->id);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
} else {
if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) {
char oldIpStr[40];
char newIpStr[40];
taosIpPort2String(pConn->ip, pConn->port, oldIpStr);
taosIpPort2String(info.clientIp, info.clientPort, newIpStr);
mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr,
pConn->user, oldIpStr);
if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
}
}
SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp));
......@@ -346,7 +351,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
pConn->queryId = 0;
}
pRsp->connId = htonl(pConn->connId);
pRsp->connId = htonl(pConn->id);
pRsp->totalDnodes = htonl(1);
pRsp->onlineDnodes = htonl(1);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
......@@ -525,47 +530,47 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
int32_t numOfRows = 0;
SConnObj *pConnObj = NULL;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
while (numOfRows < rows) {
pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj);
if (pConnObj == NULL) break;
pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
if (pConn == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pConnObj->connId;
*(int32_t *)pWrite = pConn->id;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
cols++;
// app name
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->app, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]);
cols++;
// app pid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pConnObj->pid;
*(int32_t *)pWrite = pConn->pid;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
taosIpPort2String(pConnObj->ip, pConnObj->port, ipStr);
taosIpPort2String(pConn->ip, pConn->port, ipStr);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pConnObj->stime;
*(int64_t *)pWrite = pConn->stime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pConnObj->lastAccess < pConnObj->stime) pConnObj->lastAccess = pConnObj->stime;
*(int64_t *)pWrite = pConnObj->lastAccess;
if (pConn->lastAccess < pConn->stime) pConn->lastAccess = pConn->stime;
*(int64_t *)pWrite = pConn->lastAccess;
cols++;
numOfRows++;
......@@ -576,7 +581,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
return numOfRows;
}
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) {
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -592,9 +597,15 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "query_id");
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "queryId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -687,40 +698,43 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
int32_t numOfRows = 0;
SConnObj *pConnObj = NULL;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
void *pIter;
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
while (numOfRows < rows) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj);
if (pConnObj == NULL) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
if (pConn == NULL) {
pShow->pIter = pIter;
break;
}
if (numOfRows + pConnObj->numOfQueries >= rows) {
if (numOfRows + pConn->numOfQueries >= rows) {
mndCancelGetNextConn(pMnode, pIter);
break;
}
pShow->pIter = pIter;
for (int32_t i = 0; i < pConnObj->numOfQueries; ++i) {
SQueryDesc *pDesc = pConnObj->pQueries + i;
for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
SQueryDesc *pDesc = pConn->pQueries + i;
cols = 0;
snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
*(int64_t *)pWrite = htobe64(pDesc->queryId);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pConn->id);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]);
cols++;
......@@ -749,7 +763,7 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols++;
char epBuf[TSDB_EP_LEN + 1] = {0};
snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConnObj->port);
snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]);
cols++;
......@@ -783,7 +797,7 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) {
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -799,12 +813,18 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "streamId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
......@@ -867,39 +887,42 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj
return 0;
}
static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg) {
static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
int32_t numOfRows = 0;
SConnObj *pConnObj = NULL;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
void *pIter;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
while (numOfRows < rows) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj);
if (pConnObj == NULL) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
if (pConn == NULL) {
pShow->pIter = pIter;
break;
}
if (numOfRows + pConnObj->numOfStreams >= rows) {
if (numOfRows + pConn->numOfStreams >= rows) {
mndCancelGetNextConn(pMnode, pIter);
break;
}
pShow->pIter = pIter;
for (int32_t i = 0; i < pConnObj->numOfStreams; ++i) {
SStreamDesc *pDesc = pConnObj->pStreams + i;
for (int32_t i = 0; i < pConn->numOfStreams; ++i) {
SStreamDesc *pDesc = pConn->pStreams + i;
cols = 0;
snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->streamId));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
*(int64_t *)pWrite = htobe64(pDesc->streamId);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pConn->id);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -907,7 +930,7 @@ static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMn
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConn->ip), pConn->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
cols++;
......
......@@ -16,14 +16,14 @@
#define _DEFAULT_SOURCE
#include "mndShow.h"
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
static void mndFreeShowObj(SShowObj *pShow);
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static char *mndShowStr(int32_t showType);
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
......@@ -116,6 +116,9 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) {
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
if (pShow == NULL) return;
mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove);
// A bug in tcache.c
forceRemove = 0;
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
......@@ -244,7 +247,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
pMnodeMsg->pCont = pRsp;
pMnodeMsg->contLen = size;
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1;
mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow);
mndReleaseShowObj(pShow, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册