提交 f3250e62 编写于 作者: H Haojun Liao

[td-1660]

上级 0060a73e
...@@ -1494,6 +1494,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1494,6 +1494,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
// TODO refactor full_name
char *db; // ugly code to move the space char *db; // ugly code to move the space
db = strstr(pObj->db, TS_PATH_DELIMITER); db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1; db = (db == NULL) ? pObj->db : db + 1;
...@@ -1501,6 +1502,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1501,6 +1502,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion));
tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion));
pConnect->pid = htonl(taosGetPId());
taosGetCurrentAPPName(pConnect->appName, NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1653,6 +1657,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1653,6 +1657,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams; pHeartbeat->numOfStreams = numOfStreams;
pHeartbeat->pid = htonl(taosGetPId());
taosGetCurrentAPPName(pHeartbeat->appName, NULL);
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
......
...@@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 256 #define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb #define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
#define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_BYTES_PER_ROW 16384
#define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_TAGS_LEN 16384
......
...@@ -305,6 +305,8 @@ typedef struct { ...@@ -305,6 +305,8 @@ typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
int32_t pid;
} SCMConnectMsg; } SCMConnectMsg;
typedef struct { typedef struct {
...@@ -756,8 +758,10 @@ typedef struct { ...@@ -756,8 +758,10 @@ typedef struct {
typedef struct { typedef struct {
uint32_t connId; uint32_t connId;
int32_t pid;
int32_t numOfQueries; int32_t numOfQueries;
int32_t numOfStreams; int32_t numOfStreams;
char appName[TSDB_APPNAME_LEN];
char pData[]; char pData[];
} SCMHeartBeatMsg; } SCMHeartBeatMsg;
......
...@@ -23,6 +23,8 @@ extern "C" { ...@@ -23,6 +23,8 @@ extern "C" {
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char appName[TSDB_APPNAME_LEN]; // app name that invokes taosc
uint32_t pid; // pid of app that invokes taosc
int8_t killed; int8_t killed;
uint16_t port; uint16_t port;
uint32_t ip; uint32_t ip;
...@@ -40,7 +42,7 @@ typedef struct { ...@@ -40,7 +42,7 @@ typedef struct {
int32_t mnodeInitProfile(); int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn); void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
......
...@@ -72,7 +72,7 @@ void mnodeCleanupProfile() { ...@@ -72,7 +72,7 @@ void mnodeCleanupProfile() {
} }
} }
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app) {
#if 0 #if 0
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) { if (connSize > tsMaxShellConns) {
...@@ -90,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -90,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
.ip = ip, .ip = ip,
.port = port, .port = port,
.connId = connId, .connId = connId,
.stime = taosGetTimestampMs() .stime = taosGetTimestampMs(),
.pid = pid,
}; };
tstrncpy(connObj.user, user, sizeof(connObj.user)); tstrncpy(connObj.user, user, tListLen(connObj.user));
tstrncpy(connObj.appName, app, tListLen(connObj.appName));
connObj.lastAccess = connObj.stime; connObj.lastAccess = connObj.stime;
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000);
...@@ -177,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -177,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
// app name
pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "app_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
// app pid
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "pid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port"); strcpy(pSchema[cols].name, "ip:port");
...@@ -185,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -185,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "login time"); strcpy(pSchema[cols].name, "login_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "last access"); strcpy(pSchema[cols].name, "last_access");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -230,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -230,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
cols++; cols++;
// app name
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->appName, pShow->bytes[cols]);
cols++;
// app pid
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t*)pWrite = pConnObj->pid;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
...@@ -248,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -248,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 5; mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
} }
......
...@@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { ...@@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
rowsToRead = pShow->numOfRows - pShow->numOfReads; rowsToRead = pShow->numOfRows - pShow->numOfReads;
} }
/* return no more than 100 meters in one round trip */ /* return no more than 100 tables in one round trip */
if (rowsToRead > 100) rowsToRead = 100; if (rowsToRead > 100) rowsToRead = 100;
/* /*
...@@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t connId = htonl(pHBMsg->connId); int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); pHBMsg->pid = htonl(pHBMsg->pid);
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
} }
if (pConn == NULL) { if (pConn == NULL) {
...@@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over; goto connect_over;
} }
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); pConnectMsg->pid = htonl(pConnectMsg->pid);
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName);
if (pConn == NULL) { if (pConn == NULL) {
code = terrno; code = terrno;
} else { } else {
......
...@@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread); ...@@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetPthreadId(); int64_t taosGetPthreadId();
void taosResetPthread(pthread_t *thread); void taosResetPthread(pthread_t *thread);
bool taosComparePthread(pthread_t first, pthread_t second); bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char *name, int32_t* len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } ...@@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); }
void taosResetPthread(pthread_t *thread) { *thread = 0; } void taosResetPthread(pthread_t *thread) { *thread = 0; }
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
const char* self = "/proc/self/exe";
char path[PATH_MAX] = {0};
if (readlink(self, path, PATH_MAX) <= 0) {
return -1;
}
path[PATH_MAX - 1] = 0;
char* end = strrchr(path, '/');
if (end == NULL) {
return -1;
}
++end;
strcpy(name, end);
if (len != NULL) {
*len = strlen(name);
}
return 0;
}
#endif #endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册