未验证 提交 4d8867d9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12639 from taosdata/fix/tsim

refactor: shm queue in multi process mode
......@@ -41,8 +41,8 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(const SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc);
......@@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(const SRpcMsg* pMsg);
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);
......
......@@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
PutToQueueFp fp = pMsgCb->queueFps[qtype];
return (*fp)(pMsgCb->mgmt, pReq);
return (*fp)(pMsgCb->mgmt, pMsg);
}
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
......@@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*fp)(pMsgCb->mgmt, vgId, qtype);
}
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) {
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
return (*fp)(epSet, pReq);
return (*fp)(epSet, pMsg);
}
void tmsgSendRsp(const SRpcMsg* pMsg) {
void tmsgSendRsp(SRpcMsg* pMsg) {
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pMsg);
}
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
(*fp)(pMsg, pNewEpSet);
}
......
......@@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL);
taosThreadClear(&pMgmt->statusThread);
}
}
......@@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL);
taosThreadClear(&pMgmt->monitorThread);
}
}
......
......@@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread);
}
taosMemoryFree(pThread->pCfgs);
}
......
......@@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper);
int32_t dmRunProc(SProc *proc);
void dmStopProc(SProc *proc);
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmCloseProcRpcHandles(SProc *proc);
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
// dmTransport.c
int32_t dmInitServer(SDnode *pDnode);
......
......@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
static int32_t dmInitProcMutex(SProcQueue *queue) {
TdThreadMutexAttr mattr = {0};
......@@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
return queue;
}
#if 0
static void dmDestroyProcQueue(SProcQueue *queue) {
if (queue->mutex != NULL) {
taosThreadMutexDestroy(queue->mutex);
queue->mutex = NULL;
}
}
static void dmDestroyProcSem(SProcQueue *queue) {
if (queue->sem != NULL) {
tsem_destroy(queue->sem);
queue->sem = NULL;
}
}
#endif
static void dmCleanupProcQueue(SProcQueue *queue) {}
static void dmCleanupProcQueue(SProcQueue *queue) {
#if 0
if (queue != NULL) {
dmDestroyProcQueue(queue);
dmDestroyProcSem(queue);
}
#endif
}
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen);
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
const void *pHead = pMsg;
const void *pBody = pMsg->pCont;
const int16_t rawHeadLen = sizeof(SRpcMsg);
const int32_t rawBodyLen = pMsg->contLen;
const int16_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
const int64_t handle = (int64_t)pMsg->info.handle;
taosThreadMutexLock(&queue->mutex);
if (fullLen > queue->avail) {
......@@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
return -1;
}
if (handle != 0 && ftype == DND_FUNC_REQ) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
}
......@@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
if (queue->tail < queue->head) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
queue->tail = queue->tail + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->tail;
if (remain == 0) {
memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
queue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(queue->pBuffer, pHead, rawHeadLen);
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen);
} else {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
queue->tail = queue->tail + headLen + bodyLen + 8;
}
}
......@@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
pBody, bodyLen, pos, queue->items);
dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
return 0;
}
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
EProcFuncType *pFuncType) {
static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
tsem_wait(&queue->sem);
taosThreadMutexLock(&queue->mutex);
......@@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
void *pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || pBody == NULL) {
void *pBody = NULL;
if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
taosFreeQitem(pHead);
......@@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
const int32_t pos = queue->head;
if (queue->head < queue->tail) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
queue->head = queue->head + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->head;
if (remain == 0) {
memcpy(pHead, queue->pBuffer + 8, headLen);
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
queue->head = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pHead, queue->pBuffer, headLen);
memcpy(pBody, queue->pBuffer + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
queue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
queue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
queue->head = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
queue->head = queue->head + headLen + bodyLen + 8;
}
}
......@@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
queue->items--;
taosThreadMutexUnlock(&queue->mutex);
*ppHead = pHead;
*ppBody = pBody;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*ppMsg = pHead;
(*ppMsg)->pCont = pBody;
*pFuncType = (EProcFuncType)ftype;
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
bodyLen, pos, queue->items);
dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
(*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
return 1;
}
......@@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pReq = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from cqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
break;
......@@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) {
}
if (ftype != DND_FUNC_REQ) {
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
} else {
pReq = pHead;
pReq->pCont = pBody;
code = dmProcessNodeMsg(pWrapper, pReq);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
SRpcMsg rspMsg = {
.info = pReq->info,
.pCont = pReq->info.rsp,
.contLen = pReq->info.rspLen,
};
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
rpcFreeCont(rspMsg.pCont);
}
dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
code = dmProcessNodeMsg(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
SRpcMsg rsp = {
.code = (terrno != 0 ? terrno : code),
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
} while (1);
......@@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from pqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
break;
......@@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) {
}
if (ftype == DND_FUNC_RSP) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcSendResponse(pRsp);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcSendResponse(pMsg);
} else if (ftype == DND_FUNC_REGIST) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
rpcRegisterBrokenLinkArg(pRsp);
rpcRegisterBrokenLinkArg(pMsg);
} else if (ftype == DND_FUNC_RELEASE) {
pRsp = pHead;
pRsp->pCont = NULL;
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
} else {
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
rpcFreeCont(pBody);
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
}
taosFreeQitem(pHead);
taosFreeQitem(pMsg);
} while (1);
return NULL;
......@@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash);
proc->hash = NULL;
dDebug("node:%s, proc is cleaned up", pWrapper->name);
}
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&proc->cqueue->mutex);
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(proc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&proc->cqueue->mutex);
return ref;
}
void dmCloseProcRpcHandles(SProc *proc) {
taosThreadMutexLock(&proc->cqueue->mutex);
void *h = taosHashIterate(proc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
h = taosHashIterate(proc->hash, h);
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
while (pInfo != NULL) {
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
pInfo = taosHashIterate(proc->hash, pInfo);
}
taosHashClear(proc->hash);
taosThreadMutexUnlock(&proc->cqueue->mutex);
}
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) {
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
int32_t retry = 0;
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
retry++;
taosMsleep(retry);
while (1) {
if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
break;
}
if (retry == 10) {
pMsg->code = terrno;
if (pMsg->contLen > 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
pMsg->contLen = 0;
}
dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
} else {
dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
retry++;
taosMsleep(retry);
}
}
}
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t ref, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
}
......@@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
DND_FUNC_REQ);
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
} else {
code = dmProcessNodeMsg(pWrapper, pMsg);
}
......@@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
}
}
static inline void dmSendRsp(const SRpcMsg *pMsg) {
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
} else {
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
}
}
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
......@@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pRsp->info;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
}
}
......@@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
......@@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SMgmtWrapper *pWrapper = pHandle->wrapper;
if (InChildProc(pWrapper)) {
SRpcMsg msg = {.code = type, .info = *pHandle};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
} else {
rpcReleaseHandle(pHandle->handle, type);
}
......
......@@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) {
pMnode->stopped = true;
if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL);
memset(&pMnode->thread, 0, sizeof(pMnode->thread));
taosThreadClear(&pMnode->thread);
}
}
......
......@@ -16,10 +16,9 @@
#include "tcache.h"
void reportStartup(const char *name, const char *desc) {}
void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
// rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
......
......@@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
int32_t ret = 0;
atomic_store_8(&io->isStart, 0);
taosThreadJoin(io->consumerTid, NULL);
taosThreadClear(&io->consumerTid);
taosTmrCleanUp(io->timerMgr);
return ret;
}
......
......@@ -244,6 +244,7 @@ static void walStopThread() {
if (taosCheckPthreadValid(tsWal.thread)) {
taosThreadJoin(tsWal.thread, NULL);
taosThreadClear(&tsWal.thread);
}
wDebug("wal thread is stopped");
......
......@@ -145,6 +145,7 @@ void taosCloseLog() {
taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
}
tsLogInited = 0;
......
......@@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
taosThreadJoin(pSched->qthread[i], NULL);
taosThreadClear(&pSched->qthread[i]);
}
}
......
......@@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
}
}
......@@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
taosFreeQall(worker->qall);
taosCloseQset(worker->qset);
}
......
......@@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) {
taosMsleep(300);
for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL);
taosThreadClear(&pInfo[i].thread);
}
int64_t maxDelay = 0;
......
......@@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
......
......@@ -56,6 +56,7 @@ void simFreeScript(SScript *script) {
bgScript->killed = true;
if (taosCheckPthreadValid(bgScript->bgPid)) {
taosThreadJoin(bgScript->bgPid, NULL);
taosThreadClear(&bgScript->bgPid);
}
simDebug("script:%s, background thread joined", bgScript->fileName);
......
......@@ -985,6 +985,7 @@ int32_t shellExecute() {
while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册