提交 69aa4546 编写于 作者: S Shengliang Guan

send response on child queue killed

上级 43e84cbd
......@@ -24,11 +24,10 @@ extern "C" {
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj;
typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype);
typedef struct {
......@@ -43,7 +42,7 @@ typedef struct {
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
SShm shm;
void *pParent;
void *parent;
const char *name;
bool isChild;
} SProcCfg;
......@@ -51,10 +50,13 @@ typedef struct {
SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype);
void taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
#ifdef __cplusplus
}
......
......@@ -88,6 +88,10 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
return 0;
}
static void dndProcessProcHandle(void *handle) {
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rpcMsg);
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process");
......@@ -220,6 +224,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
dndNewProc(pWrapper, n);
}
}
......
......@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
......@@ -68,7 +68,8 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_REQ);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1);
......@@ -454,6 +455,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
rpcRegisterBrokenLinkArg(pMsg);
break;
case PROC_RELEASE:
taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont);
break;
......@@ -461,6 +463,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break;
case PROC_RSP:
taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
dndSendRpcRsp(pWrapper, pMsg);
break;
default:
......@@ -481,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.parent = pWrapper,
.name = pWrapper->name};
return cfg;
}
\ No newline at end of file
......@@ -15,7 +15,9 @@
#define _DEFAULT_SOURCE
#include "tprocess.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
#include "tlog.h"
#include "tqueue.h"
......@@ -48,8 +50,9 @@ typedef struct SProcObj {
ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
void *pParent;
void *parent;
const char *name;
SHashObj *hash;
int32_t pid;
bool isChild;
bool stopFlag;
......@@ -151,8 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
#endif
}
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
int32_t rawBodyLen, ProcFuncType ftype) {
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
......@@ -164,6 +167,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
return -1;
}
if (handle != 0 && ftype == PROC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
......@@ -317,6 +328,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->name = pCfg->name;
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
pProc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc);
......@@ -324,7 +336,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
}
pProc->name = pCfg->name;
pProc->pParent = pCfg->pParent;
pProc->parent = pCfg->parent;
pProc->childMallocHeadFp = pCfg->childMallocHeadFp;
pProc->childFreeHeadFp = pCfg->childFreeHeadFp;
pProc->childMallocBodyFp = pCfg->childMallocBodyFp;
......@@ -384,7 +396,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
taosMsleep(1);
continue;
} else {
(*consumeFp)(pProc->pParent, pHead, headLen, pBody, bodyLen, ftype);
(*consumeFp)(pProc->parent, pHead, headLen, pBody, bodyLen, ftype);
}
}
}
......@@ -424,19 +436,41 @@ void taosProcCleanup(SProcObj *pProc) {
taosProcStop(pProc);
taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue);
if (pProc->hash != NULL) {
taosHashCleanup(pProc->hash);
pProc->hash = NULL;
}
uDebug("proc:%s, is cleaned up", pProc->name);
taosMemoryFree(pProc);
}
}
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
void *handle, ProcFuncType ftype) {
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
}
void taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex);
taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
}
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
taosThreadMutexLock(&pProc->pChildQueue->mutex);
void *h = taosHashIterate(pProc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
(*HandleFp)(handle);
}
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
}
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) {
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
taosMsleep(1);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册