diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d3c02011690fd20e0cb7d7aaf7b98877581bbb42..de0c8d127db0b4415223b0e4c085ce4c831afbd5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1494,6 +1494,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; + // TODO refactor full_name char *db; // ugly code to move the space db = strstr(pObj->db, TS_PATH_DELIMITER); db = (db == NULL) ? pObj->db : db + 1; @@ -1501,6 +1502,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); + pConnect->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pConnect->appName, NULL); + return TSDB_CODE_SUCCESS; } @@ -1653,6 +1657,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfStreams = numOfStreams; + + pHeartbeat->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pHeartbeat->appName, NULL); + int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); pthread_mutex_unlock(&pObj->mutex); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 0ece9c986274b420ef9ef7ce0d0392ddf276cee3..01a4ed32f11e2badd6bda7ffdf862195c408dcbd 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 256 -#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_SQL_SHOW_LEN 512 +#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb + +#define TSDB_APPNAME_LEN TSDB_UNI_LEN #define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_TAGS_LEN 16384 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 91271584738aff1ba0d68188ef202eb4f9aaed8e..1f1f9690e2b46219dea348e6ce41d5a003cd130e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -305,6 +305,8 @@ typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; char db[TSDB_TABLE_FNAME_LEN]; + char appName[TSDB_APPNAME_LEN]; + int32_t pid; } SCMConnectMsg; typedef struct { @@ -756,8 +758,10 @@ typedef struct { typedef struct { uint32_t connId; + int32_t pid; int32_t numOfQueries; int32_t numOfStreams; + char appName[TSDB_APPNAME_LEN]; char pData[]; } SCMHeartBeatMsg; diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index e39496ec9cacf431e2fefd737018d54a56e27f88..1e5b1c0f9c7c1c220a88339f00d46538c78cf938 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -23,6 +23,8 @@ extern "C" { typedef struct { char user[TSDB_USER_LEN]; + char appName[TSDB_APPNAME_LEN]; // app name that invokes taosc + uint32_t pid; // pid of app that invokes taosc int8_t killed; uint16_t port; uint32_t ip; @@ -40,7 +42,7 @@ typedef struct { int32_t mnodeInitProfile(); void mnodeCleanupProfile(); -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port); void mnodeReleaseConn(SConnObj *pConn); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 8e0dc2f1821e78f845ecd7d5ef92fc6709cbc123..25ca526d7e018131dbc44cdba510f9acc5a6eb76 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -72,7 +72,7 @@ void mnodeCleanupProfile() { } } -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app) { #if 0 int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); if (connSize > tsMaxShellConns) { @@ -90,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { .ip = ip, .port = port, .connId = connId, - .stime = taosGetTimestampMs() + .stime = taosGetTimestampMs(), + .pid = pid, }; - tstrncpy(connObj.user, user, sizeof(connObj.user)); + tstrncpy(connObj.user, user, tListLen(connObj.user)); + tstrncpy(connObj.appName, app, tListLen(connObj.appName)); + connObj.lastAccess = connObj.stime; SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); @@ -177,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + // app name + pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "app_name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + // app pid + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "pid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); @@ -185,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); + strcpy(pSchema[cols].name, "login_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "last access"); + strcpy(pSchema[cols].name, "last_access"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -230,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); cols++; + // app name + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->appName, pShow->bytes[cols]); + cols++; + + // app pid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*)pWrite = pConnObj->pid; + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); @@ -248,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 5; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, cols, numOfRows, rows, pShow); return numOfRows; } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index e587758e465d7a138275862ad411af884c0b78ec..80909e99aec6d752d35042ca2d761a6e8b923441 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { rowsToRead = pShow->numOfRows - pShow->numOfReads; } - /* return no more than 100 meters in one round trip */ + /* return no more than 100 tables in one round trip */ if (rowsToRead > 100) rowsToRead = 100; /* @@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { int32_t connId = htonl(pHBMsg->connId); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); if (pConn == NULL) { - pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pHBMsg->pid = htonl(pHBMsg->pid); + pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName); } if (pConn == NULL) { @@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { goto connect_over; } - SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pConnectMsg->pid = htonl(pConnectMsg->pid); + SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName); if (pConn == NULL) { code = terrno; } else { diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemphone.h index 4280b458a65b23b3aa680c6ed46f61beb5c48948..a71e74e97f4d9910414a5e801b89a1968d1df050 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemphone.h @@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetPthreadId(); void taosResetPthread(pthread_t *thread); bool taosComparePthread(pthread_t first, pthread_t second); +int32_t taosGetPId(); +int32_t taosGetCurrentAPPName(char *name, int32_t* len); #ifdef __cplusplus } diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index b91888845edd9e98315994eaada9eca245aacb24..9eb8c18a40a11fac9ada037163011cdcf92201fb 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } void taosResetPthread(pthread_t *thread) { *thread = 0; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } +int32_t taosGetPId() { return getpid(); } + +int32_t taosGetCurrentAPPName(char *name, int32_t* len) { + const char* self = "/proc/self/exe"; + char path[PATH_MAX] = {0}; + + if (readlink(self, path, PATH_MAX) <= 0) { + return -1; + } + + path[PATH_MAX - 1] = 0; + char* end = strrchr(path, '/'); + if (end == NULL) { + return -1; + } + + ++end; + + strcpy(name, end); + + if (len != NULL) { + *len = strlen(name); + } + + return 0; +} #endif \ No newline at end of file