提交 49737397 编写于 作者: D dapan1121

show apps

上级 96f8a0da
......@@ -61,6 +61,7 @@ extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads;
extern int32_t tsNumOfMnodeFetchThreads;
extern int32_t tsNumOfMnodeReadThreads;
extern int32_t tsNumOfVnodeQueryThreads;
extern int32_t tsNumOfVnodeFetchThreads;
......
......@@ -106,6 +106,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_CONNS,
TSDB_MGMT_TABLE_QUERIES,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_APPS,
TSDB_MGMT_TABLE_MAX,
} EShowType;
......@@ -2000,7 +2001,6 @@ typedef struct {
int64_t useconds;
int64_t stime; // timestamp precision ms
int64_t reqRid;
int32_t pid;
bool stableQuery;
char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum;
......@@ -2009,8 +2009,6 @@ typedef struct {
typedef struct {
uint32_t connId;
int32_t pid;
char app[TSDB_APP_NAME_LEN];
SArray* queryDesc; // SArray<SQueryDesc>
} SQueryHbReqBasic;
......@@ -2025,9 +2023,31 @@ typedef struct {
SArray* pQnodeList;
} SQueryHbRspBasic;
typedef struct SAppClusterSummary {
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
} SAppClusterSummary;
typedef struct {
int64_t appId;
int32_t pid;
char name[TSDB_APP_NAME_LEN];
int64_t startTime;
SAppClusterSummary summary;
} SAppHbReq;
typedef struct {
SClientHbKey connKey;
int64_t clusterId;
SAppHbReq app;
SQueryHbReqBasic* query;
SHashObj* info; // hash<Skv.key, Skv>
} SClientHbReq;
......
......@@ -76,10 +76,12 @@ typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq
typedef struct {
int8_t inited;
int64_t appId;
// ctl
int8_t threadStop;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
SHashObj *appSummary;
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[CONN_TYPE__MAX];
......@@ -92,33 +94,20 @@ typedef struct SQueryExecMetric {
int64_t rsp; // receive response from server, us
} SQueryExecMetric;
typedef struct SInstanceSummary {
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
} SInstanceSummary;
typedef struct SHeartBeatInfo {
void* pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo;
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
TdThreadMutex qnodeMutex;
SArray* pQnodeList;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
uint64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
int64_t numOfConns;
SCorEpSet mgmtEp;
TdThreadMutex qnodeMutex;
SArray* pQnodeList;
SAppClusterSummary summary;
SList* pConnList; // STscObj linked list
uint64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
};
typedef struct SAppInfo {
......
......@@ -48,7 +48,7 @@ static void registerRequest(SRequestObj *pRequest) {
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
if (pTscObj->pAppInfo) {
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
......@@ -62,7 +62,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj *pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
......
......@@ -314,7 +314,6 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.queryId = pRequest->requestId;
desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid;
desc.stableQuery = pRequest->stableQuery;
taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
......@@ -360,8 +359,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
}
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
......@@ -507,6 +504,21 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
return TSDB_CODE_SUCCESS;
}
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL != pApp) {
memcpy(&req->app, pApp, sizeof(*pApp));
} else {
memset(&req->app.summary, 0, sizeof(req->app.summary));
req->app.pid = taosGetPId();
req->app.appId = clientHbMgr.appId;
taosGetAppName(req->app.name, NULL);
}
return TSDB_CODE_SUCCESS;
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL;
......@@ -517,6 +529,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return code;
}
hbGetAppInfo(*clusterId, req);
hbGetQueryBasicInfo(connKey, req);
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
......@@ -589,6 +603,47 @@ void hbThreadFuncUnexpectedStopped(void) {
atomic_store_8(&clientHbMgr.threadStop, 2);
}
void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) {
dst->numOfInsertsReq += src->numOfInsertsReq;
dst->numOfInsertRows += src->numOfInsertRows;
dst->insertElapsedTime += src->insertElapsedTime;
dst->insertBytes += src->insertBytes;
dst->fetchBytes += src->fetchBytes;
dst->queryElapsedTime += src->queryElapsedTime;
dst->numOfSlowQueries += src->numOfSlowQueries;
dst->totalRequests += src->totalRequests;
dst->currentRequests += src->currentRequests;
}
int32_t hbGatherAppInfo(void) {
SAppHbReq req = {0};
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) {
req.pid = taosGetPId();
req.appId = clientHbMgr.appId;
taosGetAppName(req.name, NULL);
}
for (int32_t i = 0; i < sz; ++i) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req));
} else {
if (pAppHbMgr->startTime < pApp->startTime) {
pApp->startTime = pAppHbMgr->startTime;
}
hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
}
}
return TSDB_CODE_SUCCESS;
}
static void *hbThreadFunc(void *param) {
setThreadName("hb");
#ifdef WINDOWS
......@@ -603,6 +658,10 @@ static void *hbThreadFunc(void *param) {
taosThreadMutexLock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) {
hbGatherAppInfo();
}
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
......@@ -746,6 +805,10 @@ int hbMgrInit() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
clientHbMgr.appId = tGenIdPI64();
tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
taosThreadMutexInit(&clientHbMgr.lock, NULL);
......
......@@ -315,6 +315,24 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema appSchema[] = {
{.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "start_time", .bytes = 8 , .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "insert_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_row", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "insert_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "fetch_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "query_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "show_query", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "total_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "current_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
{TSDB_PERFS_TABLE_QUERIES, querySchema, tListLen(querySchema)},
......@@ -325,6 +343,7 @@ static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}
};
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
......
......@@ -51,10 +51,11 @@ int32_t tsNumOfShmThreads = 1;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 1;
int32_t tsNumOfMnodeQueryThreads = 1;
int32_t tsNumOfMnodeQueryThreads = 2;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 1;
int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeMergeThreads = 2;
......
......@@ -191,13 +191,25 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
if (pReq->connKey.connType == CONN_TYPE__QUERY) {
if (tEncodeI64(pEncoder, pReq->app.appId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->app.pid) < 0) return -1;
if (tEncodeCStr(pEncoder, pReq->app.name) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->app.startTime) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertsReq) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertRows) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.insertElapsedTime) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.insertBytes) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.fetchBytes) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.queryElapsedTime) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.numOfSlowQueries) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.totalRequests) < 0) return -1;
if (tEncodeU64(pEncoder, pReq->app.summary.currentRequests) < 0) return -1;
int32_t queryNum = 0;
if (pReq->query) {
queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->query->pid) < 0) return -1;
if (tEncodeCStr(pEncoder, pReq->query->app) < 0) return -1;
int32_t num = taosArrayGetSize(pReq->query->queryDesc);
if (tEncodeI32(pEncoder, num) < 0) return -1;
......@@ -209,7 +221,6 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1;
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI32(pEncoder, desc->pid) < 0) return -1;
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
......@@ -243,14 +254,26 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1;
if (pReq->connKey.connType == CONN_TYPE__QUERY) {
if (tDecodeI64(pDecoder, &pReq->app.appId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->app.pid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pReq->app.name) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->app.startTime) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertsReq) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertRows) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.insertElapsedTime) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.insertBytes) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.fetchBytes) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.queryElapsedTime) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfSlowQueries) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.totalRequests) < 0) return -1;
if (tDecodeU64(pDecoder, &pReq->app.summary.currentRequests) < 0) return -1;
int32_t queryNum = 0;
if (tDecodeI32(pDecoder, &queryNum) < 0) return -1;
if (queryNum) {
pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query));
if (NULL == pReq->query) return -1;
if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->query->pid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pReq->query->app) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(pDecoder, &num) < 0) return -1;
......@@ -265,7 +288,6 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&desc.stableQuery) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
......
......@@ -30,6 +30,7 @@ typedef struct SMnodeMgmt {
const char *path;
const char *name;
SSingleWorker queryWorker;
SSingleWorker fetchWorker;
SSingleWorker readWorker;
SSingleWorker writeWorker;
SSingleWorker syncWorker;
......@@ -57,6 +58,7 @@ int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
......
......@@ -122,6 +122,13 @@ int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
}
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pMsg->info.node = pMgmt->pMnode;
return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
}
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
}
......@@ -135,6 +142,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
case QUERY_QUEUE:
pWorker = &pMgmt->queryWorker;
break;
case FETCH_QUEUE:
pWorker = &pMgmt->fetchWorker;
break;
case READ_QUEUE:
pWorker = &pMgmt->readWorker;
break;
......@@ -167,6 +177,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1;
}
SSingleWorkerCfg fCfg = {
.min = tsNumOfMnodeFetchThreads,
.max = tsNumOfMnodeFetchThreads,
.name = "mnode-fetch",
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg) != 0) {
dError("failed to start mnode-fetch worker since %s", terrstr());
return -1;
}
SSingleWorkerCfg rCfg = {
.min = tsNumOfMnodeReadThreads,
.max = tsNumOfMnodeReadThreads,
......@@ -227,6 +249,7 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
......
......@@ -67,7 +67,8 @@ typedef struct {
} SShowMgmt;
typedef struct {
SCacheObj *cache;
SCacheObj *connCache;
SCacheObj *appCache;
} SProfileMgmt;
typedef struct {
......
......@@ -43,6 +43,16 @@ typedef struct {
SArray *pQueries; // SArray<SQueryDesc>
} SConnObj;
typedef struct {
int64_t appId;
uint32_t ip;
int32_t pid;
char name[TSDB_APP_NAME_LEN];
int64_t startTime;
SAppClusterSummary summary;
int64_t lastAccessTimeMs;
} SAppObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
......@@ -57,14 +67,24 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
static void mndFreeApp(SAppObj *pApp);
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
// in ms
int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
if (pMgmt->cache == NULL) {
int32_t checkTime = tsShellActivityTimer * 2 * 1000;
pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_INT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
if (pMgmt->connCache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc profile cache since %s", terrstr());
return -1;
}
pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
if (pMgmt->appCache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc profile cache since %s", terrstr());
return -1;
......@@ -79,15 +99,22 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
return 0;
}
void mndCleanupProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
if (pMgmt->cache != NULL) {
taosCacheCleanup(pMgmt->cache);
pMgmt->cache = NULL;
if (pMgmt->connCache != NULL) {
taosCacheCleanup(pMgmt->connCache);
pMgmt->connCache = NULL;
}
if (pMgmt->appCache != NULL) {
taosCacheCleanup(pMgmt->appCache);
pMgmt->appCache = NULL;
}
}
......@@ -118,7 +145,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
SConnObj *pConn = taosCachePut(pMgmt->connCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
......@@ -140,14 +167,13 @@ static void mndFreeConn(SConnObj *pConn) {
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
if (pConn == NULL) {
mDebug("conn:%u, already destroyed", connId);
return NULL;
}
int32_t keepTime = tsShellActivityTimer * 3;
pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
pConn->lastAccessTimeMs = taosGetTimestampMs();
mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
return pConn;
......@@ -158,7 +184,7 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
}
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
......@@ -275,6 +301,77 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
return TSDB_CODE_SUCCESS;
}
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SAppObj app;
app.appId = pReq->appId;
app.ip = clientIp;
app.pid = pReq->pid;
strcpy(app.name, pReq->name);
app.startTime = pReq->startTime;
memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
app.lastAccessTimeMs = taosGetTimestampMs();
int32_t keepTime = tsShellActivityTimer * 3;
SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
if (pApp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
return NULL;
}
mTrace("app %" PRIx64 " is put into cache", pReq->appId);
return pApp;
}
static void mndFreeApp(SAppObj *pApp) {
mTrace("app %" PRIx64 " is destroyed", pApp->appId);
}
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
if (pApp == NULL) {
mDebug("app %" PRIx64 " not in cache", appId);
return NULL;
}
pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();
mTrace("app %" PRIx64 " acquired from cache", appId);
return pApp;
}
static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
if (pApp == NULL) return;
mTrace("release app %" PRIx64 " to cache", pApp->appId);
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
SAppObj *pApp = NULL;
bool hasNext = taosCacheIterNext(pIter);
if (hasNext) {
size_t dataLen = 0;
pApp = taosCacheIterGetData(pIter, &dataLen);
} else {
taosCacheDestroyIter(pIter);
}
return pApp;
}
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
#if 0
SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
......@@ -340,26 +437,47 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return NULL;
}
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
SAppHbReq* pReq = &pHbReq->app;
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
if (pApp == NULL) {
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
if (pApp == NULL) {
mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
return -1;
} else {
mDebug("a new app %" PRIx64 "created", pReq->appId);
return TSDB_CODE_SUCCESS;
}
}
memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));
mndReleaseApp(pMnode, pApp);
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
SRpcConnInfo connInfo = pMsg->conn;
mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
SRpcConnInfo connInfo = pMsg->conn;
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
pBasic->pid, pBasic->app, 0);
pHbReq->app.pid, pHbReq->app.name, 0);
if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1;
} else {
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
pConn = mndAcquireConn(pMnode, pBasic->connId);
mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
}
}
......@@ -518,7 +636,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
mInfo("kill query msg is received, queryId:%d", killReq.queryId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -526,7 +644,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
} else {
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
pConn->killId = killReq.queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
return 0;
}
}
......@@ -550,7 +668,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
return -1;
}
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -558,7 +676,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
} else {
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
}
}
......@@ -572,7 +690,7 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
}
while (numOfRows < rows) {
......@@ -628,7 +746,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
}
while (numOfRows < rows) {
......@@ -667,7 +785,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false);
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConn->user);
......@@ -721,6 +839,80 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
return numOfRows;
}
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SAppObj *pApp = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
}
while (numOfRows < rows) {
pApp = mndGetNextApp(pMnode, pShow->pIter);
if (pApp == NULL) {
pShow->pIter = NULL;
break;
}
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pApp->appId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pApp->ip, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pApp->pid, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->name, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->startTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.numOfInsertsReq, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.numOfInsertRows, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.insertElapsedTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.insertBytes, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.fetchBytes, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.queryElapsedTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.numOfSlowQueries, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.totalRequests, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->summary.currentRequests, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)pApp->lastAccessTimeMs, false);
numOfRows++;
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
......@@ -729,5 +921,5 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t mndGetNumOfConnections(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
return taosCacheGetNumOfObj(pMgmt->cache);
return taosCacheGetNumOfObj(pMgmt->connCache);
}
......@@ -103,6 +103,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_STREAMS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
type = TSDB_MGMT_TABLE_APPS;
} else {
// ASSERT(0);
}
......
......@@ -1287,7 +1287,8 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
//terrno = TSDB_CODE_MND_INVALID_STB;
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......
......@@ -1972,8 +1972,6 @@ void* ctgUpdateThreadFunc(void* param) {
ctgdShowClusterCache(pCtg);
}
if (CTG_IS_LOCKED(&gCtgMgmt.lock)) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
qInfo("catalog update thread stopped");
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册