diff --git a/src/balance/src/bnThread.c b/src/balance/src/bnThread.c index 44cb24effa09688db79bb9ae8fa40a381c0c0404..c5dca2da8596dffbbee6417281a739cec7bba016 100644 --- a/src/balance/src/bnThread.c +++ b/src/balance/src/bnThread.c @@ -23,6 +23,8 @@ static SBnThread tsBnThread; static void *bnThreadFunc(void *arg) { + setThreadName("bnThreadd"); + while (1) { pthread_mutex_lock(&tsBnThread.mutex); if (tsBnThread.stop) { diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9557d3afda8c2883b8637c869402ca0814646525..264070d70f0e1709b8224da360932cc58c308146 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -110,6 +110,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); +int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void doRetrieveSubqueryData(SSchedMsg *pMsg); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index eeb7cc68aa7a427a3db6372cef1ddafce067cb56..5b5224e92eed6c18409d063222d9a6220ad6dcdf 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -138,7 +138,8 @@ typedef struct STableDataBlocks { uint32_t size; STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; - + bool cloned; + SParsedDataColInfo boundColumnInfo; // for parameter ('?') binding @@ -436,4 +437,4 @@ int32_t getExtendedRowSize(STableComInfo *tinfo); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index adc24dfbabca1844989a94883e1126a93eeec2ee..d26cbebc34e53287e1184f95c52a8ba35d88b70c 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1056,7 +1056,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 return TSDB_CODE_SUCCESS; } -static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { +int32_t FORCE_INLINE tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { pBlocks->tid = pTableMeta->id.tid; pBlocks->uid = pTableMeta->id.uid; pBlocks->sversion = pTableMeta->sversion; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index d39f9ebe24acd32f1b6c9ced80e31bde3ec3cc62..01c90680fa559f9b2a0602b33d755140258779ca 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -47,6 +47,7 @@ typedef struct SNormalStmt { typedef struct SMultiTbStmt { bool nameSet; bool tagSet; + bool subSet; uint64_t currentUid; char *sqlstr; uint32_t tbNum; @@ -54,6 +55,7 @@ typedef struct SMultiTbStmt { SStrToken stbname; SStrToken values; SArray *tags; + STableDataBlocks *lastBlock; SHashObj *pTableHash; SHashObj *pTableBlockHashList; // data block for each table } SMultiTbStmt; @@ -347,11 +349,11 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { //////////////////////////////////////////////////////////////////////////////// // functions for insertion statement preparation -static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { - if (bind->is_null != NULL && *(bind->is_null)) { - setNull(data + param->offset, param->type, param->bytes); - return TSDB_CODE_SUCCESS; - } +static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { + if (bind->is_null != NULL && *(bind->is_null)) { + setNull(data + param->offset, param->type, param->bytes); + return TSDB_CODE_SUCCESS; + } #if 0 if (0) { @@ -746,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: - size = 1; + *(uint8_t *)(data + param->offset) = *(uint8_t *)bind->buffer; break; case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_USMALLINT: - size = 2; + *(uint16_t *)(data + param->offset) = *(uint16_t *)bind->buffer; break; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_FLOAT: - size = 4; + *(uint32_t *)(data + param->offset) = *(uint32_t *)bind->buffer; break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_TIMESTAMP: - size = 8; + *(uint64_t *)(data + param->offset) = *(uint64_t *)bind->buffer; break; case TSDB_DATA_TYPE_BINARY: @@ -790,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, return TSDB_CODE_TSC_INVALID_VALUE; } - memcpy(data + param->offset, bind->buffer, size); if (param->offset == 0) { if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) { tscError("invalid timestamp"); @@ -801,6 +802,58 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, return TSDB_CODE_SUCCESS; } +static int32_t insertStmtGenLastBlock(STableDataBlocks** lastBlock, STableDataBlocks* pBlock) { + *lastBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks)); + memcpy(*lastBlock, pBlock, sizeof(STableDataBlocks)); + (*lastBlock)->cloned = true; + + (*lastBlock)->pData = NULL; + (*lastBlock)->ordered = true; + (*lastBlock)->prevTS = INT64_MIN; + (*lastBlock)->size = sizeof(SSubmitBlk); + (*lastBlock)->tsSource = -1; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t insertStmtGenBlock(STscStmt* pStmt, STableDataBlocks** pBlock, STableMeta* pTableMeta, SName* name) { + int32_t code = 0; + + if (pStmt->mtb.lastBlock == NULL) { + tscError("no previous data block"); + return TSDB_CODE_TSC_APP_ERROR; + } + + int32_t msize = tscGetTableMetaSize(pTableMeta); + int32_t tsize = sizeof(STableDataBlocks) + msize; + + void *t = malloc(tsize); + *pBlock = t; + + memcpy(*pBlock, pStmt->mtb.lastBlock, sizeof(STableDataBlocks)); + + t = (char *)t + sizeof(STableDataBlocks); + (*pBlock)->pTableMeta = t; + memcpy((*pBlock)->pTableMeta, pTableMeta, msize); + + (*pBlock)->pData = malloc((*pBlock)->nAllocSize); + + (*pBlock)->vgId = (*pBlock)->pTableMeta->vgId; + + tNameAssign(&(*pBlock)->tableName, name); + + SSubmitBlk* blk = (SSubmitBlk*)(*pBlock)->pData; + memset(blk, 0, sizeof(*blk)); + + code = tsSetBlockInfo(blk, pTableMeta, 0); + if (code != TSDB_CODE_SUCCESS) { + STMT_RET(code); + } + + return TSDB_CODE_SUCCESS; +} + static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) { if (bind->buffer_type != param->type || !isValidDataType(param->type)) { @@ -1172,7 +1225,7 @@ static void insertBatchClean(STscStmt* pStmt) { static int insertBatchStmtExecute(STscStmt* pStmt) { int32_t code = 0; - + if(pStmt->mtb.nameSet == false) { tscError("0x%"PRIx64" no table name set", pStmt->pSql->self); return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set"); @@ -1227,11 +1280,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { pStmt->mtb.tbname = sToken; pStmt->mtb.nameSet = false; if (pStmt->mtb.pTableHash == NULL) { - pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + pStmt->mtb.pTableHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); } if (pStmt->mtb.pTableBlockHashList == NULL) { - pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + pStmt->mtb.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } pStmt->mtb.tagSet = true; @@ -1522,6 +1575,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) { STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) { STMT_RET(TSDB_CODE_TSC_DISCONNECTED); @@ -1559,6 +1613,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData; pCmd->batchSize = pBlk->numOfRows; + if (pBlk->numOfRows == 0) { + (*t1)->prevTS = INT64_MIN; + } taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); @@ -1566,6 +1623,51 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags STMT_RET(TSDB_CODE_SUCCESS); } + if (pStmt->mtb.subSet && taosHashGetSize(pStmt->mtb.pTableHash) > 0) { + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + char sTableName[TSDB_TABLE_FNAME_LEN]; + strncpy(sTableName, pTableMeta->sTableName, sizeof(sTableName)); + + SStrToken tname = {0}; + tname.type = TK_STRING; + tname.z = (char *)name; + tname.n = (uint32_t)strlen(name); + SName fullname = {0}; + tscSetTableFullName(&fullname, &tname, pSql); + + memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname)); + + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + STMT_RET(code); + } + + pTableMeta = pTableMetaInfo->pTableMeta; + + if (strcmp(sTableName, pTableMeta->sTableName)) { + tscError("0x%"PRIx64" only tables belongs to one stable is allowed", pSql->self); + STMT_RET(TSDB_CODE_TSC_APP_ERROR); + } + + STableDataBlocks* pBlock = NULL; + + insertStmtGenBlock(pStmt, &pBlock, pTableMeta, &pTableMetaInfo->name); + + pCmd->batchSize = 0; + + pStmt->mtb.currentUid = pTableMeta->id.uid; + pStmt->mtb.tbNum++; + + taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); + taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); + taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); + + tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid); + + STMT_RET(TSDB_CODE_SUCCESS); + } + if (pStmt->mtb.tagSet) { pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name); } else { @@ -1594,7 +1696,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags pCmd->insertParam.pTableBlockHashList = hashList; } - int32_t code = tsParseSql(pStmt->pSql, true); + code = tsParseSql(pStmt->pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { // wait for the callback function to post the semaphore tsem_wait(&pStmt->pSql->rspSem); @@ -1622,6 +1724,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES); taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid)); + if (pStmt->mtb.lastBlock == NULL) { + insertStmtGenLastBlock(&pStmt->mtb.lastBlock, pBlock); + } + tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid); } @@ -1629,7 +1735,17 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags } +int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) { + STscStmt* pStmt = (STscStmt*)stmt; + pStmt->mtb.subSet = true; + return taos_stmt_set_tbname_tags(stmt, name, NULL); +} + + + int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { + STscStmt* pStmt = (STscStmt*)stmt; + pStmt->mtb.subSet = false; return taos_stmt_set_tbname_tags(stmt, name, NULL); } @@ -1653,6 +1769,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { if (pStmt->pSql && pStmt->pSql->res.code != 0) { rmMeta = true; } + tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL; @@ -1687,6 +1804,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) { pStmt->last = STMT_BIND; + tscDebug("tableId:%" PRIu64 ", try to bind one row", pStmt->mtb.currentUid); + STMT_RET(insertStmtBindParam(pStmt, bind)); } else { STMT_RET(normalStmtBindParam(pStmt, bind)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c9a1a2492c711b85a7f9316ee4ee6c5507467e59..a78e5fa5f2e117c73f58d5f3f31857712d7230b4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1517,12 +1517,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { } tfree(pDataBlock->pData); - tfree(pDataBlock->params); - - // free the refcount for metermeta - if (pDataBlock->pTableMeta != NULL) { - tfree(pDataBlock->pTableMeta); - } if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; @@ -1531,7 +1525,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } - tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + if (!pDataBlock->cloned) { + tfree(pDataBlock->params); + + // free the refcount for metermeta + if (pDataBlock->pTableMeta != NULL) { + tfree(pDataBlock->pTableMeta); + } + + tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + } + tfree(pDataBlock); } @@ -1710,12 +1714,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff dataBuf->nAllocSize = dataBuf->headerSize * 2; } - dataBuf->pData = calloc(1, dataBuf->nAllocSize); + //dataBuf->pData = calloc(1, dataBuf->nAllocSize); + dataBuf->pData = malloc(dataBuf->nAllocSize); if (dataBuf->pData == NULL) { tscError("failed to allocated memory, reason:%s", strerror(errno)); tfree(dataBuf); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); //Here we keep the tableMeta to avoid it to be remove by other threads. dataBuf->pTableMeta = tscTableMetaDup(pTableMeta); @@ -1956,16 +1962,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) { pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList == NULL) { - pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES); - } else { - memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES); + pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); } STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); int32_t i = 0; while(p1) { STableDataBlocks* pBlocks = *p1; - tfree(pInsertParam->pTableNameList[i]); + //tfree(pInsertParam->pTableNameList[i]); pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); @@ -2009,14 +2013,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); if (dataBuf->nAllocSize < destSize) { - while (dataBuf->nAllocSize < destSize) { - dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5); - } + dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); if (tmp != NULL) { dataBuf->pData = tmp; - memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); + //memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size); } else { // failed to allocate memory, free already allocated memory and return error code tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); @@ -4384,7 +4386,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { assert(pTableMeta != NULL); size_t size = tscGetTableMetaSize(pTableMeta); - STableMeta* p = calloc(1, size); + STableMeta* p = malloc(size); memcpy(p, pTableMeta, size); return p; } diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 72e2d42ff9bb8141d6bfc11dcc13ec470f9b09e1..26502c5d9cd032afd20d89ba8ea2da72b82a62c1 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) { SName* tNameDup(const SName* name) { assert(name != NULL); - SName* p = calloc(1, sizeof(SName)); + SName* p = malloc(sizeof(SName)); memcpy(p, name, sizeof(SName)); return p; } diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index e4942c49aaba48db8d2a1c74a0af532769de9553..8aa28d1618efe871de7795b0d403ee060cf5941c 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -150,6 +150,8 @@ static void *dnodeProcessMPeerQueue(void *param) { SMnodeMsg *pPeerMsg; int32_t type; void * unUsed; + + setThreadName("dnodeMPeerQ"); while (1) { if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) { diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 90332e6783bc4861928833d5794f0787f80be993..184a6b743afdd5f4284a5acffc8d518356be4ee4 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -155,6 +155,8 @@ static void *dnodeProcessMReadQueue(void *param) { int32_t type; void * unUsed; + setThreadName("dnodeMReadQ"); + while (1) { if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pRead, &unUsed) == 0) { dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index a409d537fa8a56f03ed79d68358ac70b780e74e9..904ddc21d019343fa3f679db4d25cd9b01e1d97b 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -168,7 +168,9 @@ static void *dnodeProcessMWriteQueue(void *param) { SMnodeMsg *pWrite; int32_t type; void * unUsed; - + + setThreadName("dnodeMWriteQ"); + while (1) { if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset); diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index 4caece16612353155886ff20055433878ec7411c..59b66879d42e3133ec7143c95f31c9b5df8ddb60 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -245,6 +245,8 @@ static void* telemetryThread(void* param) { clock_gettime(CLOCK_REALTIME, &end); end.tv_sec += 300; // wait 5 minutes before send first report + setThreadName("telemetryThrd"); + while (!tsExit) { int r = 0; struct timespec ts = end; diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index daf62aac94a5e10a5899ad9c8593b5ff7df86f46..c1bfb1460b4b3058434c628f503a4775c4c24701 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -103,6 +103,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) { int32_t qtype; void * handle; + setThreadName("dnodeMgmtQ"); + while (1) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) { dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 41016d7b99d049922e4de7dc0cbd3dafd2bc4ebf..e8003a8fe7996316d0b91689ea9738cbd24184b9 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -118,6 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) { SVReadMsg * pRead; int32_t qtype; void * pVnode; + char name[16]; + + memset(name, 0, 16); + snprintf(name, 16, "%s-dnReadQ", pPool->name); + setThreadName(name); while (1) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) { diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index ff2d12f001710b5ddcd310cff03929fd3c038782..ed2a6e210939e907e89dd07de27481d385e4ef24 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -191,6 +191,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) { taosBlockSIGPIPE(); dDebug("dnode vwrite worker:%d is running", pWorker->workerId); + setThreadName("dnodeWriteQ"); + while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs == 0) { diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index f01a510370758a04fe8972304ae352b796dc6e35..8ea8e280de10f0657ad0b937fb78794175d8c20a 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -91,6 +91,8 @@ static void *dnodeOpenVnode(void *param) { dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("dnodeOpenVnode"); + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { int32_t vgId = pThread->vnodeList[v]; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId, tsOpenVnodes, tsTotalVnodes); diff --git a/src/inc/taos.h b/src/inc/taos.h index 83cf0f57cf90e6d0e9250b1d269cf50f91dee952..ba53c1ca8f57632d4270213be8eb7a7dbecd4dd2 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -111,10 +111,12 @@ typedef struct TAOS_MULTI_BIND { } TAOS_MULTI_BIND; + DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); diff --git a/src/kit/shell/src/shellCheck.c b/src/kit/shell/src/shellCheck.c index 4ff5dc36fc72bf2bfaa7ffea3f6a47c619c6235b..d78f1a6b996110f9cdce37e60dfbb85323a3a848 100644 --- a/src/kit/shell/src/shellCheck.c +++ b/src/kit/shell/src/shellCheck.c @@ -104,6 +104,8 @@ static void shellFreeTbnames() { static void *shellCheckThreadFp(void *arg) { ShellThreadObj *pThread = (ShellThreadObj *)arg; + setThreadName("shellCheckThrd"); + int32_t interval = tbNum / pThread->totalThreads + 1; int32_t start = pThread->threadIndex * interval; int32_t end = (pThread->threadIndex + 1) * interval; diff --git a/src/kit/shell/src/shellDarwin.c b/src/kit/shell/src/shellDarwin.c index 31ad7046e9176221e10c79b3f2367ea464529438..86c0fea5739ff624cada266e09920f875c24a69a 100644 --- a/src/kit/shell/src/shellDarwin.c +++ b/src/kit/shell/src/shellDarwin.c @@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) { TAOS *con = (TAOS *)arg; + setThreadName("shellLoopQuery"); + pthread_cleanup_push(cleanup_handler, NULL); char *command = malloc(MAX_COMMAND_SIZE); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index 5de50a3aaf70074da0592e628b932e49aaf89c48..222d69e854933095ec0aadaa8a67bf1c19954c3b 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -223,6 +223,8 @@ static void shellSourceFile(TAOS *con, char *fptr) { void* shellImportThreadFp(void *arg) { ShellThreadObj *pThread = (ShellThreadObj*)arg; + setThreadName("shellImportThrd"); + for (int f = 0; f < shellSQLFileNum; ++f) { if (f % pThread->totalThreads == pThread->threadIndex) { char *SQLFileName = shellSQLFiles[f]; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 4eead252fd4c47893ba966c1686d375fd7b9b6dc..2a32a8d82e73794fbea930c174d6e51f9c194fc3 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) { TAOS *con = (TAOS *)arg; + setThreadName("shellLoopQuery"); + pthread_cleanup_push(cleanup_handler, NULL); char *command = malloc(MAX_COMMAND_SIZE); diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 4c7e550760cecb7c045cb8c94fc431cb5f91812b..0c70386061b99baaf2f9448ddadbb250685f23d4 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -26,6 +26,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { } void *cancelHandler(void *arg) { + setThreadName("cancelHandler"); + while(1) { if (tsem_wait(&cancelSem) != 0) { taosMsleep(10); diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 2beb0c8e7efc3b0348d37264358fa4c64219de16..cd00d96ac3adcabcf20692de1977ffc12207188b 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -71,9 +71,9 @@ extern char configDir[]; #define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into .. -#define COL_BUFFER_LEN (TSDB_MAX_BYTES_PER_ROW - 50) -#define BUFFER_SIZE (50 + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_MAX_BYTES_PER_ROW + TSDB_MAX_TAGS_LEN) +#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN #define COND_BUF_LEN (BUFFER_SIZE - 30) +#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS) #define MAX_USERNAME_SIZE 64 #define MAX_PASSWORD_SIZE 64 #define MAX_HOSTNAME_SIZE 64 @@ -2367,7 +2367,7 @@ static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { return dataBuf; } -static char* generateTagVaulesForStb(SSuperTable* stbInfo, int32_t tableSeq) { +static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); @@ -2403,47 +2403,47 @@ static char* generateTagVaulesForStb(SSuperTable* stbInfo, int32_t tableSeq) { } //rand_string(buf, stbInfo->tags[i].dataLen); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "\'%s\', ", buf); + "\'%s\',", buf); tmfree(buf); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "int", strlen("int"))) { if ((g_args.demo_mode) && (i == 0)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d, ", tableSeq % 10); + "%d,", tableSeq % 10); } else { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d, ", tableSeq); + "%d,", tableSeq); } } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", strlen("bigint"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%"PRId64", ", rand_bigint()); + "%"PRId64",", rand_bigint()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "float", strlen("float"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%f, ", rand_float()); + "%f,", rand_float()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "double", strlen("double"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%f, ", rand_double()); + "%f,", rand_double()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "smallint", strlen("smallint"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d, ", rand_smallint()); + "%d,", rand_smallint()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "tinyint", strlen("tinyint"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d, ", rand_tinyint()); + "%d,", rand_tinyint()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bool", strlen("bool"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d, ", rand_bool()); + "%d,", rand_bool()); } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "timestamp", strlen("timestamp"))) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%"PRId64", ", rand_bigint()); + "%"PRId64",", rand_bigint()); } else { printf("No support data type: %s\n", stbInfo->tags[i].dataType); tmfree(dataBuf); @@ -2451,7 +2451,7 @@ static char* generateTagVaulesForStb(SSuperTable* stbInfo, int32_t tableSeq) { } } - dataLen -= 2; + dataLen -= 1; dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); return dataBuf; } @@ -2723,54 +2723,54 @@ static int createSuperTable( if (strcasecmp(dataType, "BINARY") == 0) { len += snprintf(cols + len, COL_BUFFER_LEN - len, - ", C%d %s(%d)", colIndex, "BINARY", + ",C%d %s(%d)", colIndex, "BINARY", superTbl->columns[colIndex].dataLen); lenOfOneRow += superTbl->columns[colIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { len += snprintf(cols + len, COL_BUFFER_LEN - len, - ", C%d %s(%d)", colIndex, "NCHAR", + ",C%d %s(%d)", colIndex, "NCHAR", superTbl->columns[colIndex].dataLen); lenOfOneRow += superTbl->columns[colIndex].dataLen + 3; } else if (strcasecmp(dataType, "INT") == 0) { if ((g_args.demo_mode) && (colIndex == 1)) { len += snprintf(cols + len, COL_BUFFER_LEN - len, - ", VOLTAGE INT"); + ",VOLTAGE INT"); } else { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", colIndex, "INT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT"); } lenOfOneRow += 11; } else if (strcasecmp(dataType, "BIGINT") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "BIGINT"); lenOfOneRow += 21; } else if (strcasecmp(dataType, "SMALLINT") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "SMALLINT"); lenOfOneRow += 6; } else if (strcasecmp(dataType, "TINYINT") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", colIndex, "TINYINT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "TINYINT"); lenOfOneRow += 4; } else if (strcasecmp(dataType, "BOOL") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", colIndex, "BOOL"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "BOOL"); lenOfOneRow += 6; } else if (strcasecmp(dataType, "FLOAT") == 0) { if (g_args.demo_mode) { if (colIndex == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", CURRENT FLOAT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",CURRENT FLOAT"); } else if (colIndex == 2) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", PHASE FLOAT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",PHASE FLOAT"); } } else { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", colIndex, "FLOAT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "FLOAT"); } lenOfOneRow += 22; } else if (strcasecmp(dataType, "DOUBLE") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "DOUBLE"); lenOfOneRow += 42; } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ", C%d %s", + len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "TIMESTAMP"); lenOfOneRow += 21; } else { @@ -2814,17 +2814,17 @@ static int createSuperTable( if (strcasecmp(dataType, "BINARY") == 0) { if ((g_args.demo_mode) && (tagIndex == 1)) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "location BINARY(%d), ", + "location BINARY(%d),", superTbl->tags[tagIndex].dataLen); } else { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s(%d), ", tagIndex, "BINARY", + "T%d %s(%d),", tagIndex, "BINARY", superTbl->tags[tagIndex].dataLen); } lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s(%d), ", tagIndex, + "T%d %s(%d),", tagIndex, "NCHAR", superTbl->tags[tagIndex].dataLen); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 3; } else if (strcasecmp(dataType, "INT") == 0) { @@ -2833,32 +2833,32 @@ static int createSuperTable( "groupId INT, "); } else { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "INT"); + "T%d %s,", tagIndex, "INT"); } lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 11; } else if (strcasecmp(dataType, "BIGINT") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "BIGINT"); + "T%d %s,", tagIndex, "BIGINT"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 21; } else if (strcasecmp(dataType, "SMALLINT") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "SMALLINT"); + "T%d %s,", tagIndex, "SMALLINT"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 6; } else if (strcasecmp(dataType, "TINYINT") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "TINYINT"); + "T%d %s,", tagIndex, "TINYINT"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 4; } else if (strcasecmp(dataType, "BOOL") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "BOOL"); + "T%d %s,", tagIndex, "BOOL"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 6; } else if (strcasecmp(dataType, "FLOAT") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "FLOAT"); + "T%d %s,", tagIndex, "FLOAT"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 22; } else if (strcasecmp(dataType, "DOUBLE") == 0) { len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, - "t%d %s, ", tagIndex, "DOUBLE"); + "T%d %s,", tagIndex, "DOUBLE"); lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + 42; } else { taos_close(taos); @@ -2868,7 +2868,7 @@ static int createSuperTable( } } - len -= 2; + len -= 1; len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len, ")"); superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow; @@ -3025,10 +3025,11 @@ static void* createTable(void *sarg) threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("createTable"); + uint64_t lastPrintTime = taosGetTimestampMs(); - int buff_len; - buff_len = BUFFER_SIZE; + int buff_len = BUFFER_SIZE; pThreadInfo->buffer = calloc(buff_len, 1); if (pThreadInfo->buffer == NULL) { @@ -3066,7 +3067,7 @@ static void* createTable(void *sarg) } char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo, i); + tagsValBuf = generateTagValuesForStb(superTblInfo, i); } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, @@ -5329,7 +5330,7 @@ static int generateStbSQLHead( if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo, tableSeq); + tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq); } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, @@ -5344,7 +5345,7 @@ static int generateStbSQLHead( len = snprintf( headBuf, HEAD_BUFF_LEN, - "%s.%s using %s.%s tags %s values", + "%s.%s using %s.%s TAGS%s values", dbName, tableName, dbName, @@ -6428,6 +6429,8 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("syncWrite"); + uint32_t interlaceRows; if (superTblInfo) { @@ -6513,6 +6516,8 @@ static void *asyncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("asyncWrite"); + pThreadInfo->st = 0; pThreadInfo->et = 0; pThreadInfo->lastTs = pThreadInfo->start_time; @@ -6590,6 +6595,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, } else { start_time = 1500000000000; } + debugPrint("%s() LN%d, start_time= %"PRId64"\n", + __func__, __LINE__, start_time); int64_t start = taosGetTimestampMs(); @@ -6911,6 +6918,7 @@ static void *readTable(void *sarg) { #if 1 threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS *taos = pThreadInfo->taos; + setThreadName("readTable"); char command[BUFFER_SIZE] = "\0"; uint64_t sTime = pThreadInfo->start_time; char *tb_prefix = pThreadInfo->tb_prefix; @@ -6983,6 +6991,7 @@ static void *readMetric(void *sarg) { #if 1 threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS *taos = pThreadInfo->taos; + setThreadName("readMetric"); char command[BUFFER_SIZE] = "\0"; FILE *fp = fopen(pThreadInfo->filePath, "a"); if (NULL == fp) { @@ -7159,6 +7168,8 @@ static int insertTestProcess() { static void *specifiedTableQuery(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; + setThreadName("specTableQuery"); + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -7258,6 +7269,8 @@ static void *superTableQuery(void *sarg) { char sqlstr[MAX_QUERY_SQL_LENGTH]; threadInfo *pThreadInfo = (threadInfo *)sarg; + setThreadName("superTableQuery"); + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -7560,6 +7573,8 @@ static void *superSubscribe(void *sarg) { TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; uint64_t tsubSeq; + setThreadName("superSub"); + if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); @@ -7706,6 +7721,8 @@ static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; // TAOS_SUB* tsub = NULL; + setThreadName("specSub"); + if (pThreadInfo->taos == NULL) { pThreadInfo->taos = taos_connect(g_queryInfo.host, g_queryInfo.user, diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 98521d842064c8fe1b07478810c15d870ceaadf5..e5501b4366f24fdee9ace153e59b4a60dc67455f 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -1474,6 +1474,8 @@ static void* taosDumpOutWorkThreadFp(void *arg) STableRecord tableRecord; int fd; + setThreadName("dumpOutWorkThrd"); + char tmpBuf[4096] = {0}; sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex); fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); @@ -2571,6 +2573,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset, static void* taosDumpInWorkThreadFp(void *arg) { SThreadParaObj *pThread = (SThreadParaObj*)arg; + setThreadName("dumpInWorkThrd"); + for (int32_t f = 0; f < g_tsSqlFileNum; ++f) { if (f % pThread->totalThreads == pThread->threadIndex) { char *SQLFileName = g_tsDumpInSqlFiles[f]; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 897c3a2f0f6c144a7975c73fd2ad57342dc59f7f..7644f4d7339251037fbe244b17e5af1fa9af0484 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -1113,6 +1113,7 @@ static void *sdbWorkerFp(void *pWorker) { void * unUsed; taosBlockSIGPIPE(); + setThreadName("sdbWorker"); while (1) { int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed); diff --git a/src/os/inc/osDef.h b/src/os/inc/osDef.h index 9176da5b8ee5216ee07bf7d6706c8acbb5ece299..54a4f98254039761d3486267ab2081ac77444489 100644 --- a/src/os/inc/osDef.h +++ b/src/os/inc/osDef.h @@ -210,6 +210,25 @@ extern "C" { #define PRIzu "zu" #endif + +#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64) + #if defined(_TD_DARWIN_64) + // MacOS + #if !defined(_GNU_SOURCE) + #define setThreadName(name) do { pthread_setname_np((name)); } while (0) + #else + // pthread_setname_np not defined + #define setThreadName(name) + #endif + #else + // Linux, length of name must <= 16 (the last '\0' included) + #define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0) + #endif +#else + // Windows + #define setThreadName(name) +#endif + #ifdef __cplusplus } #endif diff --git a/src/os/inc/osInc.h b/src/os/inc/osInc.h index 340ff34635d6a76781d9deb5c6012f0e2c5ffa13..9b78110833e73274f82d051cbaa6fd35c90f2a08 100644 --- a/src/os/inc/osInc.h +++ b/src/os/inc/osInc.h @@ -85,6 +85,7 @@ extern "C" { #include #include #include + #include #if !(defined(_ALPINE)) #include diff --git a/src/os/src/darwin/dwSemaphore.c b/src/os/src/darwin/dwSemaphore.c index 898410647ad6e23428656f6f820504126268af07..25cb28cff1b6f9c83cab43faf68641717450c0ea 100644 --- a/src/os/src/darwin/dwSemaphore.c +++ b/src/os/src/darwin/dwSemaphore.c @@ -41,6 +41,8 @@ static semaphore_t sem_exit; static void* sem_thread_routine(void *arg) { (void)arg; + setThreadName("sem_thrd"); + sem_port = mach_task_self(); kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); if (ret != KERN_SUCCESS) { diff --git a/src/os/src/darwin/dwTimer.c b/src/os/src/darwin/dwTimer.c index ee1becc91af5237296bb2a562982b33cc5c78f10..d395a7f53f2baa9a806ab8cbe96253deb7334f6e 100644 --- a/src/os/src/darwin/dwTimer.c +++ b/src/os/src/darwin/dwTimer.c @@ -32,6 +32,7 @@ static volatile int timer_stop = 0; static void* timer_routine(void *arg) { (void)arg; + setThreadName("timer"); int r = 0; struct timespec to = {0}; diff --git a/src/os/src/detail/osTimer.c b/src/os/src/detail/osTimer.c index b054f08c7842ed405f818b26be6040c543fa7644..c381b3e825f508b17ec0d6a053a574957c5dc365 100644 --- a/src/os/src/detail/osTimer.c +++ b/src/os/src/detail/osTimer.c @@ -38,6 +38,8 @@ static void *taosProcessAlarmSignal(void *tharg) { struct sigevent sevent = {{0}}; + setThreadName("alarmSignal"); + #ifdef _ALPINE sevent.sigev_notify = SIGEV_THREAD; sevent.sigev_value.sival_int = syscall(__NR_gettid); diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 7f7ce404600b96b0efe8385ea0aed7be412a6ba0..677ab0c91d4a6e446eae6dd5213788166b2c40bf 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) { int32_t type; void * unUsed; + setThreadName("httpResultQ"); + while (1) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 9d98d3f11300fa8143f9a32718c6a865b9a59a97..f02859f165499b0c69b095599dd47890e644c604 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -117,6 +117,7 @@ static void httpProcessHttpData(void *param) { int32_t fdNum; taosSetMaskSIGPIPE(); + setThreadName("httpData"); while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; @@ -208,6 +209,7 @@ static void *httpAcceptHttpConnection(void *arg) { int32_t totalFds = 0; taosSetMaskSIGPIPE(); + setThreadName("httpAcceptConn"); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); diff --git a/src/plugins/monitor/src/monMain.c b/src/plugins/monitor/src/monMain.c index 2c4a0c1a4c56e560a9ffe5ec007cc51f9d7f9e24..960a097f5d99bb430f888d01960a879e80456d31 100644 --- a/src/plugins/monitor/src/monMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -114,6 +114,7 @@ int32_t monStartSystem() { static void *monThreadFunc(void *param) { monDebug("starting to initialize monitor module ..."); + setThreadName("monThrd"); while (1) { static int32_t accessTimes = 0; diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index eacc3b1f74cdfc0d00bb53a38beb8c85b164e200..e0f2f393bb10706daadca35715d7e0ae68d7c436 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -100,6 +100,8 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) } void* mqttClientRefresher(void* client) { + setThreadName("mqttCliRefresh"); + while (tsMqttIsRuning) { mqtt_sync((struct mqtt_client*)client); taosMsleep(100); @@ -141,4 +143,4 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) { mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); mqtt_subscribe(client, tsMqttTopic, 0); -} \ No newline at end of file +} diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 029629eff0e10f5bc8812f3df8998b24a507931d..e9feeef9d339a5b1a96e41fd35fb6c62e13ed94b 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -242,6 +242,7 @@ static void *taosAcceptTcpConnection(void *arg) { pServerObj = (SServerObj *)arg; tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + setThreadName("acceptTcpConn"); while (1) { socklen_t addrlen = sizeof(caddr); @@ -528,6 +529,11 @@ static void *taosProcessTcpData(void *param) { SFdObj *pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; + char name[16]; + + memset(name, 0, sizeof(name)); + snprintf(name, 16, "%s-tcpData", pThreadObj->label); + setThreadName(name); while (1) { int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 7a46dbe5c3e1238da93043f8dc97047f84ff72e8..086a390cb8ea2a95f576cb1bff81dfc79769863a 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -195,6 +195,8 @@ static void *taosRecvUdpData(void *param) { tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index); char *msg = pConn->buffer; + setThreadName("recvUdpData"); + while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); if (dataLen <= 0) { diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index faa6d40da398045e43dba3bbdbaf9ef6c7ccb1ff..de30114bd1c7fa9687a6d75bca3d7158137e29e4 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -47,6 +47,8 @@ static int tcount = 0; static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg = {0}; + + setThreadName("sendCliReq"); tDebug("thread:%d, start to send request", pInfo->index); diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index a152d8e4a51502357530267acb924c74426c9509..3e94a56efb3494ac5fe1942245abd0bad8815ee7 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -39,8 +39,10 @@ static int terror = 0; static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; - SRpcMsg rpcMsg, rspMsg; + SRpcMsg rpcMsg, rspMsg; + setThreadName("sendSrvReq"); + tDebug("thread:%d, start to send request", pInfo->index); while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index c0d66316cd5b802ddcaddf1015d4ceca1aa3b2c5..bf9d5201a062e048365c2887b7b6a6d70b40547f 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -263,6 +263,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { } void *syncRestoreData(void *param) { + setThreadName("syncRestoreData"); int64_t rid = (int64_t)param; SSyncPeer *pPeer = syncAcquirePeer(rid); if (pPeer == NULL) { diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index c86ab8549974712658ad3d381c4141427c000762..89fdda0686ffc6d7d5b372def92e48c6cf06c2ab 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -415,6 +415,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { } void *syncRetrieveData(void *param) { + setThreadName("syncRetrievData"); int64_t rid = (int64_t)param; SSyncPeer *pPeer = syncAcquirePeer(rid); if (pPeer == NULL) { diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 3ad9e9bba01eb861c48fa121b154af3e7838dda0..698245f9e408281e2a7c41da2d6228a1ec12217d 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -195,6 +195,8 @@ static void *syncProcessTcpData(void *param) { SConnObj * pConn = NULL; struct epoll_event events[maxEvents]; + setThreadName("syncTcpData"); + void *buffer = malloc(pInfo->bufferSize); taosBlockSIGPIPE(); @@ -257,6 +259,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) { SPoolInfo *pInfo = &pPool->info; taosBlockSIGPIPE(); + setThreadName("acceptTcpConn"); while (1) { struct sockaddr_in clientAddr; diff --git a/src/sync/test/syncClient.c b/src/sync/test/syncClient.c index 23ea54ee0c19b6ad2f93d7577d8d711874b10968..303d2376effffa3a3b2dc01580352a43aeaac9d8 100644 --- a/src/sync/test/syncClient.c +++ b/src/sync/test/syncClient.c @@ -48,6 +48,8 @@ void *sendRequest(void *param) { SInfo * pInfo = (SInfo *)param; SRpcMsg rpcMsg = {0}; + setThreadName("sendCliReq"); + uDebug("thread:%d, start to send request", pInfo->index); while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index eeaa6a08c2e47d103b62d6023dde74341585f65f..a3d06966488b95a561c9bc688b49f7ceceb87248 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -178,6 +178,8 @@ void *processWriteQueue(void *param) { int type; void *item; + setThreadName("writeQ"); + while (1) { int ret = taosReadQitem(qhandle, &type, &item); if (ret <= 0) { diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index e25014bc1e8f2456eece3d517096cfee66886800..e45ac05e979b119ee7995cf845a6f242a9953cb3 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -158,6 +158,8 @@ static void *tsdbLoopCommit(void *arg) { STsdbRepo * pRepo = NULL; TSDB_REQ_T req; + setThreadName("tsdbCommit"); + while (true) { pthread_mutex_lock(&(pQueue->lock)); @@ -208,4 +210,4 @@ void tsdbDecCommitRef(int vgId) { int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty)); tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); -} \ No newline at end of file +} diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index e847debb890fb5709381a06340d0ea9dcab5c2f1..ee7921d4c7dd1e92cccd6e696017c6d02fdcb6a5 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -678,6 +678,8 @@ void* taosCacheTimedRefresh(void *handle) { assert(pCacheArrayList != NULL); uDebug("cache refresh thread starts"); + setThreadName("cacheTimedRefre"); + const int32_t SLEEP_DURATION = 500; //500 ms int64_t count = 0; diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 45ff14ffa4adcd018cbf7a7d69b8644582855ab3..88f57e8ac24cd207fc44f581564f45a9c33c348e 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -178,6 +178,8 @@ static void *taosThreadToOpenNewFile(void *param) { char keepName[LOG_FILE_NAME_LEN + 20]; sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag); + setThreadName("openNewFile"); + tsLogObj.flag ^= 1; tsLogObj.lines = 0; char name[LOG_FILE_NAME_LEN + 20]; @@ -687,6 +689,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) { static void *taosAsyncOutputLog(void *param) { SLogBuff *tLogBuff = (SLogBuff *)param; + + setThreadName("asyncOutputLog"); while (1) { //tsem_wait(&(tLogBuff->buffNotEmpty)); diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index 318a2d48609a129bcf6094455ff2a7cc8f7c0467..0bab7b7e6623ebaa5de6d6511fa1c43719372ef5 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -50,7 +50,9 @@ static void *taosNetBindUdpPort(void *sarg) { struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + setThreadName("netBindUdpPort"); + + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { uError("failed to create UDP socket since %s", strerror(errno)); return NULL; } @@ -106,13 +108,15 @@ static void *taosNetBindTcpPort(void *sarg) { struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - STestInfo *pinfo = sarg; + STestInfo *pinfo = sarg; int32_t port = pinfo->port; SOCKET serverSocket; int32_t addr_len = sizeof(clientAddr); SOCKET client; char buffer[BUFFER_SIZE]; + setThreadName("netBindTcpPort"); + if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { uError("failed to create TCP socket since %s", strerror(errno)); return NULL; diff --git a/src/util/src/tnote.c b/src/util/src/tnote.c index c6d49dfb3d8242bdf697b3212fde479f01f8bcd9..b691abc5b9f6f828edcc46ec3a5989baa083f443 100644 --- a/src/util/src/tnote.c +++ b/src/util/src/tnote.c @@ -84,6 +84,8 @@ static void *taosThreadToOpenNewNote(void *param) { char name[NOTE_FILE_NAME_LEN * 2]; SNoteObj *pNote = (SNoteObj *)param; + setThreadName("openNewNote"); + pNote->flag ^= 1; pNote->lines = 0; sprintf(name, "%s.%d", pNote->name, pNote->flag); diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 16142470c95678b8663f3bd437357dcdb22635a5..3d3dfd989926c4acf8b32d8c7df724cb2d1ac079 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -122,6 +122,8 @@ void *taosProcessSchedQueue(void *scheduler) { SSchedQueue *pSched = (SSchedQueue *)scheduler; int ret = 0; + setThreadName("schedQ"); + while (1) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) { uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); @@ -234,4 +236,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { } taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); -} \ No newline at end of file +} diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index e01da070afd3333cf02c25b51d2e9711c1616fb0..fe3dcab201de3f5b5068c997f3759a5b8397b5b7 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -35,6 +35,8 @@ void *addRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id; + setThreadName("addRef"); + for (int i=0; i < pSpace->steps; ++i) { printf("a"); id = random() % pSpace->refNum; @@ -52,6 +54,8 @@ void *removeRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id, code; + setThreadName("removeRef"); + for (int i=0; i < pSpace->steps; ++i) { printf("d"); id = random() % pSpace->refNum; @@ -70,6 +74,8 @@ void *acquireRelease(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int id; + setThreadName("acquireRelease"); + for (int i=0; i < pSpace->steps; ++i) { printf("a"); @@ -91,6 +97,8 @@ void myfree(void *p) { void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; + setThreadName("openRefSpace"); + printf("c"); pSpace->rsetId = taosOpenRef(50, myfree); diff --git a/src/vnode/src/vnodeBackup.c b/src/vnode/src/vnodeBackup.c index a0a975be2bcfbb2c945a72adecbf47f9cf404b40..801af42e0e6869944ec60169b0662131be787cba 100644 --- a/src/vnode/src/vnodeBackup.c +++ b/src/vnode/src/vnodeBackup.c @@ -61,6 +61,8 @@ static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) { } static void *vnodeBackupFunc(void *param) { + setThreadName("vnodeBackup"); + while (1) { SVBackupMsg *pMsg = NULL; if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) { diff --git a/src/vnode/src/vnodeWorker.c b/src/vnode/src/vnodeWorker.c index 6fb79d10feef74d88e04368c5607d39e7d70f6f2..e94c99cbea99139a21fb7fb64c729a12d3091349 100644 --- a/src/vnode/src/vnodeWorker.c +++ b/src/vnode/src/vnodeWorker.c @@ -188,6 +188,8 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { } static void *vnodeMWorkerFunc(void *param) { + setThreadName("vnodeMWorker"); + while (1) { SVMWorkerMsg *pMsg = NULL; if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) { diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 9bd5cdf1750b106df9eed8c033c1adff91d9b8e5..45f65b2c2fc9ae5412f471805a3244644e590638 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -192,6 +192,7 @@ static void walFsyncAll() { static void *walThreadFunc(void *param) { int stop = 0; + setThreadName("walThrd"); while (1) { walUpdateSeq(); walFsyncAll();