未验证 提交 1f3f92c2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19777 from taosdata/fix/TD-22145

fix: fix fd limit crash
......@@ -58,7 +58,7 @@ extern int32_t tMsgDict[];
#define TMSG_INFO(TYPE) \
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \
(TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \
(TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG || (TYPE) < TDMT_VND_TMQ_MAX_MSG \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
......
......@@ -138,6 +138,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
p->mgmtEp = epSet;
taosThreadMutexInit(&p->qnodeMutex, NULL);
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2);
if (p->pTransporter == NULL) {
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
taosMemoryFree(p);
return NULL;
}
p->pAppHbMgr = appHbMgrInit(p, key);
if (NULL == p->pAppHbMgr) {
destroyAppInst(p);
......@@ -1463,6 +1469,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
tscError("failed to sched msg to tsc, tsc ready to quit");
rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg->pEpset);
destroySendMsgInfo(pMsg->info.ahandle);
taosMemoryFree(arg);
}
}
......
......@@ -1311,7 +1311,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrd* pThrd = createThrdObj(shandle);
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (pThrd == NULL) {
return NULL;
}
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (err == 0) {
tDebug("success to create tranport-cli thread:%d", i);
}
......@@ -1368,9 +1372,23 @@ static SCliThrd* createThrdObj(void* trans) {
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
int err = uv_loop_init(pThrd->loop);
if (err != 0) {
tError("failed to init uv_loop, reason:%s", uv_err_name(err));
taosMemoryFree(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
taosMemoryFree(pThrd);
return NULL;
}
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
if (pThrd->asyncPool == NULL) {
tError("failed to init async pool");
uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
taosMemoryFree(pThrd);
return NULL;
}
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
......
......@@ -218,24 +218,37 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
for (int i = 0; i < pool->nAsync; i++) {
int i = 0, err = 0;
for (i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
item->pThrd = arg;
QUEUE_INIT(&item->qmsg);
taosThreadMutexInit(&item->mtx, NULL);
uv_async_t* async = &(pool->asyncs[i]);
uv_async_init(loop, async, cb);
async->data = item;
err = uv_async_init(loop, async, cb);
if (err != 0) {
tError("failed to init async, reason:%s", uv_err_name(err));
break;
}
}
if (i != pool->nAsync) {
transAsyncPoolDestroy(pool);
pool = NULL;
}
return pool;
}
void transAsyncPoolDestroy(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (item == NULL) continue;
taosThreadMutexDestroy(&item->mtx);
taosMemoryFree(item);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册