提交 5e07893a 编写于 作者: X XYWang

[TD-5234]<feature>: added name for threads

上级 e8b3fde4
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
static SBnThread tsBnThread; static SBnThread tsBnThread;
static void *bnThreadFunc(void *arg) { static void *bnThreadFunc(void *arg) {
setThreadName("bnThreadd");
while (1) { while (1) {
pthread_mutex_lock(&tsBnThread.mutex); pthread_mutex_lock(&tsBnThread.mutex);
if (tsBnThread.stop) { if (tsBnThread.stop) {
......
...@@ -150,6 +150,8 @@ static void *dnodeProcessMPeerQueue(void *param) { ...@@ -150,6 +150,8 @@ static void *dnodeProcessMPeerQueue(void *param) {
SMnodeMsg *pPeerMsg; SMnodeMsg *pPeerMsg;
int32_t type; int32_t type;
void * unUsed; void * unUsed;
setThreadName("dnodeMPeerQ");
while (1) { while (1) {
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {
......
...@@ -155,6 +155,8 @@ static void *dnodeProcessMReadQueue(void *param) { ...@@ -155,6 +155,8 @@ static void *dnodeProcessMReadQueue(void *param) {
int32_t type; int32_t type;
void * unUsed; void * unUsed;
setThreadName("dnodeMReadQ");
while (1) { while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pRead, &unUsed) == 0) { if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pRead, &unUsed) == 0) {
dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset); dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);
......
...@@ -168,7 +168,9 @@ static void *dnodeProcessMWriteQueue(void *param) { ...@@ -168,7 +168,9 @@ static void *dnodeProcessMWriteQueue(void *param) {
SMnodeMsg *pWrite; SMnodeMsg *pWrite;
int32_t type; int32_t type;
void * unUsed; void * unUsed;
setThreadName("dnodeMWriteQ");
while (1) { while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset); dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
......
...@@ -245,6 +245,8 @@ static void* telemetryThread(void* param) { ...@@ -245,6 +245,8 @@ static void* telemetryThread(void* param) {
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("telemetryThrd");
while (!tsExit) { while (!tsExit) {
int r = 0; int r = 0;
struct timespec ts = end; struct timespec ts = end;
......
...@@ -103,6 +103,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) { ...@@ -103,6 +103,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
int32_t qtype; int32_t qtype;
void * handle; void * handle;
setThreadName("dnodeMgmtQ");
while (1) { while (1) {
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset); dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset);
......
...@@ -118,6 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) { ...@@ -118,6 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) {
SVReadMsg * pRead; SVReadMsg * pRead;
int32_t qtype; int32_t qtype;
void * pVnode; void * pVnode;
char name[16];
memset(name, 0, 16);
snprintf(name, 16, "%s-dnReadQ", pPool->name);
setThreadName(name);
while (1) { while (1) {
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) {
......
...@@ -191,6 +191,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -191,6 +191,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
dDebug("dnode vwrite worker:%d is running", pWorker->workerId); dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
setThreadName("dnodeWriteQ");
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
......
...@@ -91,6 +91,8 @@ static void *dnodeOpenVnode(void *param) { ...@@ -91,6 +91,8 @@ static void *dnodeOpenVnode(void *param) {
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("dnodeOpenVnode");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v]; int32_t vgId = pThread->vnodeList[v];
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId, tsOpenVnodes, tsTotalVnodes); snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId, tsOpenVnodes, tsTotalVnodes);
......
...@@ -104,6 +104,8 @@ static void shellFreeTbnames() { ...@@ -104,6 +104,8 @@ static void shellFreeTbnames() {
static void *shellCheckThreadFp(void *arg) { static void *shellCheckThreadFp(void *arg) {
ShellThreadObj *pThread = (ShellThreadObj *)arg; ShellThreadObj *pThread = (ShellThreadObj *)arg;
setThreadName("shellCheckThrd");
int32_t interval = tbNum / pThread->totalThreads + 1; int32_t interval = tbNum / pThread->totalThreads + 1;
int32_t start = pThread->threadIndex * interval; int32_t start = pThread->threadIndex * interval;
int32_t end = (pThread->threadIndex + 1) * interval; int32_t end = (pThread->threadIndex + 1) * interval;
......
...@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) { ...@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS *con = (TAOS *)arg; TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
pthread_cleanup_push(cleanup_handler, NULL); pthread_cleanup_push(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE); char *command = malloc(MAX_COMMAND_SIZE);
......
...@@ -223,6 +223,8 @@ static void shellSourceFile(TAOS *con, char *fptr) { ...@@ -223,6 +223,8 @@ static void shellSourceFile(TAOS *con, char *fptr) {
void* shellImportThreadFp(void *arg) void* shellImportThreadFp(void *arg)
{ {
ShellThreadObj *pThread = (ShellThreadObj*)arg; ShellThreadObj *pThread = (ShellThreadObj*)arg;
setThreadName("shellImportThrd");
for (int f = 0; f < shellSQLFileNum; ++f) { for (int f = 0; f < shellSQLFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) { if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = shellSQLFiles[f]; char *SQLFileName = shellSQLFiles[f];
......
...@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) { ...@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS *con = (TAOS *)arg; TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
pthread_cleanup_push(cleanup_handler, NULL); pthread_cleanup_push(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE); char *command = malloc(MAX_COMMAND_SIZE);
......
...@@ -26,6 +26,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { ...@@ -26,6 +26,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
} }
void *cancelHandler(void *arg) { void *cancelHandler(void *arg) {
setThreadName("cancelHandler");
while(1) { while(1) {
if (tsem_wait(&cancelSem) != 0) { if (tsem_wait(&cancelSem) != 0) {
taosMsleep(10); taosMsleep(10);
......
...@@ -3025,10 +3025,11 @@ static void* createTable(void *sarg) ...@@ -3025,10 +3025,11 @@ static void* createTable(void *sarg)
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("createTable");
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
int buff_len; int buff_len = BUFFER_SIZE;
buff_len = BUFFER_SIZE;
pThreadInfo->buffer = calloc(buff_len, 1); pThreadInfo->buffer = calloc(buff_len, 1);
if (pThreadInfo->buffer == NULL) { if (pThreadInfo->buffer == NULL) {
...@@ -6428,6 +6429,8 @@ static void* syncWrite(void *sarg) { ...@@ -6428,6 +6429,8 @@ static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("syncWrite");
uint32_t interlaceRows; uint32_t interlaceRows;
if (superTblInfo) { if (superTblInfo) {
...@@ -6513,6 +6516,8 @@ static void *asyncWrite(void *sarg) { ...@@ -6513,6 +6516,8 @@ static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("asyncWrite");
pThreadInfo->st = 0; pThreadInfo->st = 0;
pThreadInfo->et = 0; pThreadInfo->et = 0;
pThreadInfo->lastTs = pThreadInfo->start_time; pThreadInfo->lastTs = pThreadInfo->start_time;
...@@ -6911,6 +6916,7 @@ static void *readTable(void *sarg) { ...@@ -6911,6 +6916,7 @@ static void *readTable(void *sarg) {
#if 1 #if 1
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos; TAOS *taos = pThreadInfo->taos;
setThreadName("readTable");
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
uint64_t sTime = pThreadInfo->start_time; uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix; char *tb_prefix = pThreadInfo->tb_prefix;
...@@ -6983,6 +6989,7 @@ static void *readMetric(void *sarg) { ...@@ -6983,6 +6989,7 @@ static void *readMetric(void *sarg) {
#if 1 #if 1
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos; TAOS *taos = pThreadInfo->taos;
setThreadName("readMetric");
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(pThreadInfo->filePath, "a"); FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) { if (NULL == fp) {
...@@ -7159,6 +7166,8 @@ static int insertTestProcess() { ...@@ -7159,6 +7166,8 @@ static int insertTestProcess() {
static void *specifiedTableQuery(void *sarg) { static void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
...@@ -7258,6 +7267,8 @@ static void *superTableQuery(void *sarg) { ...@@ -7258,6 +7267,8 @@ static void *superTableQuery(void *sarg) {
char sqlstr[MAX_QUERY_SQL_LENGTH]; char sqlstr[MAX_QUERY_SQL_LENGTH];
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("superTableQuery");
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
...@@ -7560,6 +7571,8 @@ static void *superSubscribe(void *sarg) { ...@@ -7560,6 +7571,8 @@ static void *superSubscribe(void *sarg) {
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq; uint64_t tsubSeq;
setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
...@@ -7706,6 +7719,8 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -7706,6 +7719,8 @@ static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL; // TAOS_SUB* tsub = NULL;
setThreadName("specSub");
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
pThreadInfo->taos = taos_connect(g_queryInfo.host, pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user, g_queryInfo.user,
......
...@@ -1474,6 +1474,8 @@ static void* taosDumpOutWorkThreadFp(void *arg) ...@@ -1474,6 +1474,8 @@ static void* taosDumpOutWorkThreadFp(void *arg)
STableRecord tableRecord; STableRecord tableRecord;
int fd; int fd;
setThreadName("dumpOutWorkThrd");
char tmpBuf[4096] = {0}; char tmpBuf[4096] = {0};
sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex); sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
...@@ -2571,6 +2573,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset, ...@@ -2571,6 +2573,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static void* taosDumpInWorkThreadFp(void *arg) static void* taosDumpInWorkThreadFp(void *arg)
{ {
SThreadParaObj *pThread = (SThreadParaObj*)arg; SThreadParaObj *pThread = (SThreadParaObj*)arg;
setThreadName("dumpInWorkThrd");
for (int32_t f = 0; f < g_tsSqlFileNum; ++f) { for (int32_t f = 0; f < g_tsSqlFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) { if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = g_tsDumpInSqlFiles[f]; char *SQLFileName = g_tsDumpInSqlFiles[f];
......
...@@ -1113,6 +1113,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1113,6 +1113,7 @@ static void *sdbWorkerFp(void *pWorker) {
void * unUsed; void * unUsed;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName("sdbWorker");
while (1) { while (1) {
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
......
...@@ -210,6 +210,25 @@ extern "C" { ...@@ -210,6 +210,25 @@ extern "C" {
#define PRIzu "zu" #define PRIzu "zu"
#endif #endif
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// MacOS
#if !defined(_GNU_SOURCE)
#define setThreadName(name) do { pthread_setname_np((name)); } while (0)
#else
// pthread_setname_np not defined
#define setThreadName(name)
#endif
#else
// Linux, length of name must <= 16 (the last '\0' included)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#else
// Windows
#define setThreadName(name)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -85,6 +85,7 @@ extern "C" { ...@@ -85,6 +85,7 @@ extern "C" {
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/sendfile.h> #include <sys/sendfile.h>
#include <sys/prctl.h>
#if !(defined(_ALPINE)) #if !(defined(_ALPINE))
#include <error.h> #include <error.h>
......
...@@ -41,6 +41,8 @@ static semaphore_t sem_exit; ...@@ -41,6 +41,8 @@ static semaphore_t sem_exit;
static void* sem_thread_routine(void *arg) { static void* sem_thread_routine(void *arg) {
(void)arg; (void)arg;
setThreadName("sem_thrd");
sem_port = mach_task_self(); sem_port = mach_task_self();
kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
if (ret != KERN_SUCCESS) { if (ret != KERN_SUCCESS) {
......
...@@ -32,6 +32,7 @@ static volatile int timer_stop = 0; ...@@ -32,6 +32,7 @@ static volatile int timer_stop = 0;
static void* timer_routine(void *arg) { static void* timer_routine(void *arg) {
(void)arg; (void)arg;
setThreadName("timer");
int r = 0; int r = 0;
struct timespec to = {0}; struct timespec to = {0};
......
...@@ -38,6 +38,8 @@ static void *taosProcessAlarmSignal(void *tharg) { ...@@ -38,6 +38,8 @@ static void *taosProcessAlarmSignal(void *tharg) {
struct sigevent sevent = {{0}}; struct sigevent sevent = {{0}};
setThreadName("alarmSignal");
#ifdef _ALPINE #ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD; sevent.sigev_notify = SIGEV_THREAD;
sevent.sigev_value.sival_int = syscall(__NR_gettid); sevent.sigev_value.sival_int = syscall(__NR_gettid);
......
...@@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) { ...@@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) {
int32_t type; int32_t type;
void * unUsed; void * unUsed;
setThreadName("httpResultQ");
while (1) { while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
......
...@@ -117,6 +117,7 @@ static void httpProcessHttpData(void *param) { ...@@ -117,6 +117,7 @@ static void httpProcessHttpData(void *param) {
int32_t fdNum; int32_t fdNum;
taosSetMaskSIGPIPE(); taosSetMaskSIGPIPE();
setThreadName("httpData");
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
...@@ -208,6 +209,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -208,6 +209,7 @@ static void *httpAcceptHttpConnection(void *arg) {
int32_t totalFds = 0; int32_t totalFds = 0;
taosSetMaskSIGPIPE(); taosSetMaskSIGPIPE();
setThreadName("httpAcceptConn");
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
......
...@@ -114,6 +114,7 @@ int32_t monStartSystem() { ...@@ -114,6 +114,7 @@ int32_t monStartSystem() {
static void *monThreadFunc(void *param) { static void *monThreadFunc(void *param) {
monDebug("starting to initialize monitor module ..."); monDebug("starting to initialize monitor module ...");
setThreadName("monThrd");
while (1) { while (1) {
static int32_t accessTimes = 0; static int32_t accessTimes = 0;
......
...@@ -100,6 +100,8 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) ...@@ -100,6 +100,8 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published)
} }
void* mqttClientRefresher(void* client) { void* mqttClientRefresher(void* client) {
setThreadName("mqttCliRefresh");
while (tsMqttIsRuning) { while (tsMqttIsRuning) {
mqtt_sync((struct mqtt_client*)client); mqtt_sync((struct mqtt_client*)client);
taosMsleep(100); taosMsleep(100);
...@@ -141,4 +143,4 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) { ...@@ -141,4 +143,4 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, tsMqttTopic, 0); mqtt_subscribe(client, tsMqttTopic, 0);
} }
\ No newline at end of file
...@@ -242,6 +242,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -242,6 +242,7 @@ static void *taosAcceptTcpConnection(void *arg) {
pServerObj = (SServerObj *)arg; pServerObj = (SServerObj *)arg;
tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
setThreadName("acceptTcpConn");
while (1) { while (1) {
socklen_t addrlen = sizeof(caddr); socklen_t addrlen = sizeof(caddr);
...@@ -528,6 +529,11 @@ static void *taosProcessTcpData(void *param) { ...@@ -528,6 +529,11 @@ static void *taosProcessTcpData(void *param) {
SFdObj *pFdObj; SFdObj *pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
char name[16];
memset(name, 0, sizeof(name));
snprintf(name, 16, "%s-tcpData", pThreadObj->label);
setThreadName(name);
while (1) { while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
......
...@@ -195,6 +195,8 @@ static void *taosRecvUdpData(void *param) { ...@@ -195,6 +195,8 @@ static void *taosRecvUdpData(void *param) {
tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index); tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
char *msg = pConn->buffer; char *msg = pConn->buffer;
setThreadName("recvUdpData");
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) { if (dataLen <= 0) {
......
...@@ -47,6 +47,8 @@ static int tcount = 0; ...@@ -47,6 +47,8 @@ static int tcount = 0;
static void *sendRequest(void *param) { static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
tDebug("thread:%d, start to send request", pInfo->index); tDebug("thread:%d, start to send request", pInfo->index);
......
...@@ -39,8 +39,10 @@ static int terror = 0; ...@@ -39,8 +39,10 @@ static int terror = 0;
static void *sendRequest(void *param) { static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg; SRpcMsg rpcMsg, rspMsg;
setThreadName("sendSrvReq");
tDebug("thread:%d, start to send request", pInfo->index); tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
......
...@@ -263,6 +263,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -263,6 +263,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRestoreData(void *param) { void *syncRestoreData(void *param) {
setThreadName("syncRestoreData");
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) { if (pPeer == NULL) {
......
...@@ -415,6 +415,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -415,6 +415,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRetrieveData(void *param) { void *syncRetrieveData(void *param) {
setThreadName("syncRetrievData");
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) { if (pPeer == NULL) {
......
...@@ -195,6 +195,8 @@ static void *syncProcessTcpData(void *param) { ...@@ -195,6 +195,8 @@ static void *syncProcessTcpData(void *param) {
SConnObj * pConn = NULL; SConnObj * pConn = NULL;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
setThreadName("syncTcpData");
void *buffer = malloc(pInfo->bufferSize); void *buffer = malloc(pInfo->bufferSize);
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -257,6 +259,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) { ...@@ -257,6 +259,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolInfo *pInfo = &pPool->info; SPoolInfo *pInfo = &pPool->info;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName("acceptTcpConn");
while (1) { while (1) {
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
......
...@@ -48,6 +48,8 @@ void *sendRequest(void *param) { ...@@ -48,6 +48,8 @@ void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param; SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
uDebug("thread:%d, start to send request", pInfo->index); uDebug("thread:%d, start to send request", pInfo->index);
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
......
...@@ -178,6 +178,8 @@ void *processWriteQueue(void *param) { ...@@ -178,6 +178,8 @@ void *processWriteQueue(void *param) {
int type; int type;
void *item; void *item;
setThreadName("writeQ");
while (1) { while (1) {
int ret = taosReadQitem(qhandle, &type, &item); int ret = taosReadQitem(qhandle, &type, &item);
if (ret <= 0) { if (ret <= 0) {
......
...@@ -158,6 +158,8 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -158,6 +158,8 @@ static void *tsdbLoopCommit(void *arg) {
STsdbRepo * pRepo = NULL; STsdbRepo * pRepo = NULL;
TSDB_REQ_T req; TSDB_REQ_T req;
setThreadName("tsdbCommit");
while (true) { while (true) {
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -208,4 +210,4 @@ void tsdbDecCommitRef(int vgId) { ...@@ -208,4 +210,4 @@ void tsdbDecCommitRef(int vgId) {
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty)); pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
} }
\ No newline at end of file
...@@ -656,6 +656,8 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -656,6 +656,8 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL; return NULL;
} }
setThreadName("cacheTimedRefre");
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION = 500; //500 ms
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION; int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;
......
...@@ -178,6 +178,8 @@ static void *taosThreadToOpenNewFile(void *param) { ...@@ -178,6 +178,8 @@ static void *taosThreadToOpenNewFile(void *param) {
char keepName[LOG_FILE_NAME_LEN + 20]; char keepName[LOG_FILE_NAME_LEN + 20];
sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag); sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
setThreadName("openNewFile");
tsLogObj.flag ^= 1; tsLogObj.flag ^= 1;
tsLogObj.lines = 0; tsLogObj.lines = 0;
char name[LOG_FILE_NAME_LEN + 20]; char name[LOG_FILE_NAME_LEN + 20];
...@@ -687,6 +689,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) { ...@@ -687,6 +689,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param; SLogBuff *tLogBuff = (SLogBuff *)param;
setThreadName("asyncOutputLog");
while (1) { while (1) {
//tsem_wait(&(tLogBuff->buffNotEmpty)); //tsem_wait(&(tLogBuff->buffNotEmpty));
......
...@@ -50,7 +50,9 @@ static void *taosNetBindUdpPort(void *sarg) { ...@@ -50,7 +50,9 @@ static void *taosNetBindUdpPort(void *sarg) {
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { setThreadName("netBindUdpPort");
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create UDP socket since %s", strerror(errno)); uError("failed to create UDP socket since %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -106,13 +108,15 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -106,13 +108,15 @@ static void *taosNetBindTcpPort(void *sarg) {
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
STestInfo *pinfo = sarg; STestInfo *pinfo = sarg;
int32_t port = pinfo->port; int32_t port = pinfo->port;
SOCKET serverSocket; SOCKET serverSocket;
int32_t addr_len = sizeof(clientAddr); int32_t addr_len = sizeof(clientAddr);
SOCKET client; SOCKET client;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
setThreadName("netBindTcpPort");
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create TCP socket since %s", strerror(errno)); uError("failed to create TCP socket since %s", strerror(errno));
return NULL; return NULL;
......
...@@ -84,6 +84,8 @@ static void *taosThreadToOpenNewNote(void *param) { ...@@ -84,6 +84,8 @@ static void *taosThreadToOpenNewNote(void *param) {
char name[NOTE_FILE_NAME_LEN * 2]; char name[NOTE_FILE_NAME_LEN * 2];
SNoteObj *pNote = (SNoteObj *)param; SNoteObj *pNote = (SNoteObj *)param;
setThreadName("openNewNote");
pNote->flag ^= 1; pNote->flag ^= 1;
pNote->lines = 0; pNote->lines = 0;
sprintf(name, "%s.%d", pNote->name, pNote->flag); sprintf(name, "%s.%d", pNote->name, pNote->flag);
......
...@@ -122,6 +122,8 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -122,6 +122,8 @@ void *taosProcessSchedQueue(void *scheduler) {
SSchedQueue *pSched = (SSchedQueue *)scheduler; SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0; int ret = 0;
setThreadName("schedQ");
while (1) { while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
...@@ -234,4 +236,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { ...@@ -234,4 +236,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
} }
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
} }
\ No newline at end of file
...@@ -36,6 +36,8 @@ void *addRef(void *param) { ...@@ -36,6 +36,8 @@ void *addRef(void *param) {
int id; int id;
int64_t rid; int64_t rid;
setThreadName("addRef");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("a"); printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
...@@ -54,6 +56,8 @@ void *removeRef(void *param) { ...@@ -54,6 +56,8 @@ void *removeRef(void *param) {
int id; int id;
int64_t rid; int64_t rid;
setThreadName("removeRef");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("d"); printf("d");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
...@@ -73,6 +77,8 @@ void *acquireRelease(void *param) { ...@@ -73,6 +77,8 @@ void *acquireRelease(void *param) {
int id; int id;
int64_t rid; int64_t rid;
setThreadName("acquireRelease");
for (int i=0; i < pSpace->steps; ++i) { for (int i=0; i < pSpace->steps; ++i) {
printf("a"); printf("a");
...@@ -94,6 +100,8 @@ void myfree(void *p) { ...@@ -94,6 +100,8 @@ void myfree(void *p) {
void *openRefSpace(void *param) { void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param; SRefSpace *pSpace = (SRefSpace *)param;
setThreadName("openRefSpace");
printf("c"); printf("c");
pSpace->rsetId = taosOpenRef(50, myfree); pSpace->rsetId = taosOpenRef(50, myfree);
......
...@@ -61,6 +61,8 @@ static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) { ...@@ -61,6 +61,8 @@ static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) {
} }
static void *vnodeBackupFunc(void *param) { static void *vnodeBackupFunc(void *param) {
setThreadName("vnodeBackup");
while (1) { while (1) {
SVBackupMsg *pMsg = NULL; SVBackupMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) { if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) {
......
...@@ -188,6 +188,8 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { ...@@ -188,6 +188,8 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
} }
static void *vnodeMWorkerFunc(void *param) { static void *vnodeMWorkerFunc(void *param) {
setThreadName("vnodeMWorker");
while (1) { while (1) {
SVMWorkerMsg *pMsg = NULL; SVMWorkerMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) { if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) {
......
...@@ -192,6 +192,7 @@ static void walFsyncAll() { ...@@ -192,6 +192,7 @@ static void walFsyncAll() {
static void *walThreadFunc(void *param) { static void *walThreadFunc(void *param) {
int stop = 0; int stop = 0;
setThreadName("walThrd");
while (1) { while (1) {
walUpdateSeq(); walUpdateSeq();
walFsyncAll(); walFsyncAll();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册