提交 b08f6b5e 编写于 作者: S Shengliang Guan

refactor: multi process mode

上级 ad3fda2e
......@@ -32,7 +32,7 @@ typedef struct SDnodeMgmt {
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeDeployedFp isNodeDeployedFp;
IsNodeRequiredFp isNodeRequiredFp;
SDnodeData data;
} SDnodeMgmt;
......
......@@ -58,7 +58,7 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput)
pMgmt->name = pInput->name;
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeDeployedFp = pInput->isNodeDeployedFp;
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
taosInitRWLatch(&pMgmt->data.latch);
pMgmt->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
......
......@@ -40,10 +40,10 @@ static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pMgmt->data.rebootTime) / (86400000.0f);
pInfo->has_mnode = (*pMgmt->isNodeDeployedFp)(pMgmt->pDnode, MNODE);
pInfo->has_qnode = (*pMgmt->isNodeDeployedFp)(pMgmt->pDnode, QNODE);
pInfo->has_snode = (*pMgmt->isNodeDeployedFp)(pMgmt->pDnode, SNODE);
pInfo->has_bnode = (*pMgmt->isNodeDeployedFp)(pMgmt->pDnode, BNODE);
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE);
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE);
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE);
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, BNODE);
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
......
......@@ -19,13 +19,58 @@
// tobe deleted
#include "uv.h"
#include "dmUtil.h"
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SMgmtWrapper SMgmtWrapper;
#define SINGLE_PROC 0
#define CHILD_PROC 1
#define PARENT_PROC 2
#define TEST_PROC 3
#define OnlyInSingleProc(ptype) (ptype == SINGLE_PROC)
#define OnlyInChildProc(ptype) (ptype == CHILD_PROC)
#define OnlyInParentProc(ptype) (ptype == PARENT_PROC)
#define OnlyInTestProc(ptype) (ptype & TEST_PROC)
#define InChildProc(ptype) (ptype & CHILD_PROC)
#define InParentProc(ptype) (ptype & PARENT_PROC)
typedef enum {
PROC_FUNC_REQ = 1,
PROC_FUNC_RSP = 2,
PROC_FUNC_REGIST = 3,
PROC_FUNC_RELEASE = 4,
} EProcFuncType;
typedef struct {
int32_t head;
int32_t tail;
int32_t total;
int32_t avail;
int32_t items;
char name[8];
TdThreadMutex mutex;
tsem_t sem;
char pBuffer[];
} SProcQueue;
typedef struct {
SMgmtWrapper *wrapper;
const char *name;
SHashObj *hash;
SProcQueue *pqueue;
SProcQueue *cqueue;
TdThread pthread;
TdThread cthread;
SShm shm;
int32_t pid;
int8_t ptype;
bool stop;
} SProc;
typedef struct SMgmtWrapper {
SDnode *pDnode;
SMgmtFunc func;
......@@ -34,13 +79,10 @@ typedef struct SMgmtWrapper {
char *path;
int32_t refCount;
SRWLatch latch;
EDndNodeType nodeType;
EDndNodeType ntype;
bool deployed;
bool required;
EDndProcType procType;
int32_t procId;
SProcObj *procObj;
SShm procShm;
SProc proc;
NodeMsgFp msgFps[TDMT_MAX];
} SMgmtWrapper;
......@@ -75,8 +117,8 @@ typedef struct SUdfdData {
} SUdfdData;
typedef struct SDnode {
EDndProcType ptype;
EDndNodeType ntype;
int8_t ptype;
EDndNodeType rtype;
EDndEvent event;
EDndRunStatus status;
SStartupInfo startup;
......@@ -109,14 +151,26 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
// dmProc.c
int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper);
int32_t dmRunProc(SProc *proc);
void dmStopProc(SProc *proc);
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmCloseProcRpcHandles(SProc *proc);
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
// dmTransport.c
int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode);
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
int32_t dmInitMsgHandle(SDnode *pDnode);
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// mgmt nodes
SMgmtFunc dmGetMgmtFunc();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_PROCESS_H_
#define _TD_UTIL_PROCESS_H_
#include "os.h"
#include "tqueue.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen, EQItype itype);
typedef void *(*ProcFreeFp)(void *pCont);
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
EProcFuncType ftype);
typedef struct {
ProcConsumeFp childConsumeFp;
ProcMallocFp childMallocHeadFp;
ProcFreeFp childFreeHeadFp;
ProcMallocFp childMallocBodyFp;
ProcFreeFp childFreeBodyFp;
ProcConsumeFp parentConsumeFp;
ProcMallocFp parentMallocHeadFp;
ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
SShm shm;
void *parent;
const char *name;
bool isChild;
} SProcCfg;
SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype);
int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_PROCESS_H_*/
......@@ -16,9 +16,45 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
static bool dmIsNodeDeployedFp(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }
static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(pInput, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
}
if (pWrapper->ntype == DNODE && pWrapper->pDnode->rtype != DNODE && pWrapper->pDnode->rtype != NODE_END) {
required = false;
dDebug("node:%s, does not require startup in child process", pWrapper->name);
}
return required;
}
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->rtype = pOption->ntype;
if (tsMultiProcess == 0) {
pDnode->ptype = DND_PROC_SINGLE;
dInfo("dnode will run in single-process mode");
} else if (tsMultiProcess > 1) {
pDnode->ptype = DND_PROC_TEST;
dInfo("dnode will run in multi-process test mode");
} else if (pDnode->rtype == DNODE || pDnode->rtype == NODE_END) {
pDnode->ptype = DND_PROC_PARENT;
dInfo("dnode will run in parent-process mode");
} else {
pDnode->ptype = DND_PROC_CHILD;
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->rtype];
dInfo("dnode will run in child-process mode, node:%s", pWrapper->name);
}
pDnode->input.dnodeId = 0;
pDnode->input.clusterId = 0;
pDnode->input.localEp = strdup(pOption->localEp);
......@@ -33,7 +69,7 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->input.pDnode = pDnode;
pDnode->input.processCreateNodeFp = dmProcessCreateNodeReq;
pDnode->input.processDropNodeFp = dmProcessDropNodeReq;
pDnode->input.isNodeDeployedFp = dmIsNodeDeployedFp;
pDnode->input.isNodeRequiredFp = dmIsNodeRequired;
if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL ||
pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) {
......@@ -41,14 +77,6 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
return -1;
}
pDnode->ntype = pOption->ntype;
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir);
if (pDnode->lockfile == NULL) {
return -1;
}
}
taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}
......@@ -76,19 +104,6 @@ static void dmClearVars(SDnode *pDnode) {
dDebug("dnode memory is cleared, data:%p", pDnode);
}
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(pInput, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
}
return required;
}
SDnode *dmCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode");
int32_t code = -1;
......@@ -105,7 +120,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
dmSetStatus(pDnode, DND_STAT_INIT);
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
......@@ -117,9 +131,14 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
pWrapper->pDnode = pDnode;
pWrapper->name = dmNodeName(ntype);
pWrapper->procShm.id = -1;
pWrapper->nodeType = ntype;
pWrapper->procType = DND_PROC_SINGLE;
pWrapper->ntype = ntype;
pWrapper->proc.wrapper = pWrapper;
pWrapper->proc.shm.id = -1;
pWrapper->proc.pid = -1;
pWrapper->proc.ptype = pDnode->ptype;
if (ntype == DNODE) {
pWrapper->proc.ptype = DND_PROC_SINGLE;
}
taosInitRWLatch(&pWrapper->latch);
snprintf(path, sizeof(path), "%s%s%s", pOption->dataDir, TD_DIRSEP, pWrapper->name);
......@@ -129,7 +148,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->ntype, &pWrapper->procShm) != 0) {
if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) {
dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr());
goto _OVER;
}
......@@ -146,6 +165,19 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (OnlyInSingleProc(pDnode->ptype) && InParentProc(pDnode->ptype)) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir);
if (pDnode->lockfile == NULL) {
goto _OVER;
}
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
goto _OVER;
}
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
dInfo("dnode is created, data:%p", pDnode);
code = 0;
......@@ -203,7 +235,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
......@@ -321,7 +353,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
(void)dmOpenNode(pWrapper);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->procType = pDnode->ptype;
pWrapper->proc.ptype = pDnode->ptype;
}
taosThreadMutexUnlock(&pDnode->mutex);
......
......@@ -14,74 +14,33 @@
*/
#define _DEFAULT_SOURCE
#include "tprocess.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
#include "tlog.h"
#include "tqueue.h"
typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue {
int32_t head;
int32_t tail;
int32_t total;
int32_t avail;
int32_t items;
char name[8];
TdThreadMutex mutex;
tsem_t sem;
char pBuffer[];
} SProcQueue;
typedef struct SProcObj {
TdThread thread;
SProcQueue *pChildQueue;
SProcQueue *pParentQueue;
ProcConsumeFp childConsumeFp;
ProcMallocFp childMallocHeadFp;
ProcFreeFp childFreeHeadFp;
ProcMallocFp childMallocBodyFp;
ProcFreeFp childFreeBodyFp;
ProcConsumeFp parentConsumeFp;
ProcMallocFp parentMallocHeadFp;
ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
void *parent;
const char *name;
SHashObj *hash;
int32_t pid;
bool isChild;
bool stopFlag;
} SProcObj;
#include "dmMgmt.h"
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static int32_t taosProcInitMutex(SProcQueue *pQueue) {
static int32_t dmInitProcMutex(SProcQueue *queue) {
TdThreadMutexAttr mattr = {0};
if (taosThreadMutexAttrInit(&mattr) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while init attr since %s", terrstr());
dError("node:%s, failed to init mutex while init attr since %s", queue->name, terrstr());
return -1;
}
if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
taosThreadMutexAttrDestroy(&mattr);
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while set shared since %s", terrstr());
dError("node:%s, failed to init mutex while set shared since %s", queue->name, terrstr());
return -1;
}
if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) {
if (taosThreadMutexInit(&queue->mutex, &mattr) != 0) {
taosThreadMutexAttrDestroy(&mattr);
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex since %s", terrstr());
dError("node:%s, failed to init mutex since %s", queue->name, terrstr());
return -1;
}
......@@ -89,71 +48,71 @@ static int32_t taosProcInitMutex(SProcQueue *pQueue) {
return 0;
}
static int32_t taosProcInitSem(SProcQueue *pQueue) {
if (tsem_init(&pQueue->sem, 1, 0) != 0) {
static int32_t dmInitProcSem(SProcQueue *queue) {
if (tsem_init(&queue->sem, 1, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init sem");
dError("node:%s, failed to init sem since %s", queue->name, terrstr());
return -1;
}
return 0;
}
static SProcQueue *taosProcInitQueue(const char *name, bool isChild, char *ptr, int32_t size) {
static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
SProcQueue *queue = (SProcQueue *)(ptr);
int32_t bufSize = size - CEIL8(sizeof(SProcQueue));
if (bufSize <= 1024) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SProcQueue *pQueue = (SProcQueue *)(ptr);
if (!isChild) {
if (taosProcInitMutex(pQueue) != 0) {
if (InParentProc(proc->ptype) && !InChildProc(proc->ptype)) {
if (dmInitProcMutex(queue) != 0) {
return NULL;
}
if (taosProcInitSem(pQueue) != 0) {
if (dmInitProcSem(queue) != 0) {
return NULL;
}
tstrncpy(pQueue->name, name, sizeof(pQueue->name));
pQueue->head = 0;
pQueue->tail = 0;
pQueue->total = bufSize;
pQueue->avail = bufSize;
pQueue->items = 0;
tstrncpy(queue->name, proc->name, sizeof(queue->name));
queue->head = 0;
queue->tail = 0;
queue->total = bufSize;
queue->avail = bufSize;
queue->items = 0;
}
return pQueue;
return queue;
}
#if 0
static void taosProcDestroyMutex(SProcQueue *pQueue) {
if (pQueue->mutex != NULL) {
taosThreadMutexDestroy(pQueue->mutex);
pQueue->mutex = NULL;
static void dmDestroyProcQueue(SProcQueue *queue) {
if (queue->mutex != NULL) {
taosThreadMutexDestroy(queue->mutex);
queue->mutex = NULL;
}
}
static void taosProcDestroySem(SProcQueue *pQueue) {
if (pQueue->sem != NULL) {
tsem_destroy(pQueue->sem);
pQueue->sem = NULL;
static void dmDestroyProcSem(SProcQueue *queue) {
if (queue->sem != NULL) {
tsem_destroy(queue->sem);
queue->sem = NULL;
}
}
#endif
static void taosProcCleanupQueue(SProcQueue *pQueue) {
static void dmCleanupProcQueue(SProcQueue *queue) {
#if 0
if (pQueue != NULL) {
taosProcDestroyMutex(pQueue);
taosProcDestroySem(pQueue);
if (queue != NULL) {
dmDestroyProcQueue(queue);
dmDestroyProcSem(queue);
}
#endif
}
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
......@@ -165,80 +124,79 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
taosThreadMutexLock(&pQueue->mutex);
if (fullLen > pQueue->avail) {
taosThreadMutexUnlock(&pQueue->mutex);
taosThreadMutexLock(&queue->mutex);
if (fullLen > queue->avail) {
taosThreadMutexUnlock(&queue->mutex);
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1;
}
if (handle != 0 && ftype == PROC_FUNC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex);
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
}
}
const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen;
const int32_t pos = queue->tail;
if (queue->tail < queue->total) {
*(int16_t *)(queue->pBuffer + queue->tail) = rawHeadLen;
*(int8_t *)(queue->pBuffer + queue->tail + 2) = (int8_t)ftype;
*(int32_t *)(queue->pBuffer + queue->tail + 4) = rawBodyLen;
} else {
*(int16_t *)(pQueue->pBuffer) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen;
*(int16_t *)(queue->pBuffer) = rawHeadLen;
*(int8_t *)(queue->pBuffer + 2) = (int8_t)ftype;
*(int32_t *)(queue->pBuffer + 4) = rawBodyLen;
}
if (pQueue->tail < pQueue->head) {
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, rawBodyLen);
pQueue->tail = pQueue->tail + 8 + headLen + bodyLen;
if (queue->tail < queue->head) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
queue->tail = queue->tail + 8 + headLen + bodyLen;
} else {
int32_t remain = pQueue->total - pQueue->tail;
int32_t remain = queue->total - queue->tail;
if (remain == 0) {
memcpy(pQueue->pBuffer + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + 8 + headLen, pBody, rawBodyLen);
pQueue->tail = 8 + headLen + bodyLen;
memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
queue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pQueue->pBuffer, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
pQueue->tail = headLen + bodyLen;
memcpy(queue->pBuffer, pHead, rawHeadLen);
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8);
memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
pQueue->tail = headLen - (remain - 8) + bodyLen;
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
pQueue->tail = bodyLen - (remain - 8 - headLen);
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen);
pQueue->tail = pQueue->tail + headLen + bodyLen + 8;
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
queue->tail = queue->tail + headLen + bodyLen + 8;
}
}
pQueue->avail -= fullLen;
pQueue->items++;
taosThreadMutexUnlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
queue->avail -= fullLen;
queue->items++;
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d handle:%p ref:%" PRId64 ", head:%d %p body:%d %p",
pQueue->name, pos, ftype, pQueue->items, (void *)handle, handleRef, headLen, pHead, bodyLen, pBody);
dTrace("node:%s, push proc msg at pos:%d ftype:%d remain:%d handle:%p ref:%" PRId64 ", head:%d %p body:%d %p",
queue->name, pos, ftype, queue->items, (void *)handle, handleRef, headLen, pHead, bodyLen, pBody);
return 0;
}
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
EProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp,
ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) {
tsem_wait(&pQueue->sem);
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
EProcFuncType *pFuncType) {
tsem_wait(&queue->sem);
taosThreadMutexLock(&pQueue->mutex);
if (pQueue->total - pQueue->avail <= 0) {
taosThreadMutexUnlock(&pQueue->mutex);
taosThreadMutexLock(&queue->mutex);
if (queue->total - queue->avail <= 0) {
taosThreadMutexUnlock(&queue->mutex);
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return 0;
}
......@@ -246,64 +204,64 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
int16_t rawHeadLen = 0;
int8_t ftype = 0;
int32_t rawBodyLen = 0;
if (pQueue->head < pQueue->total) {
rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
if (queue->head < queue->total) {
rawHeadLen = *(int16_t *)(queue->pBuffer + queue->head);
ftype = *(int8_t *)(queue->pBuffer + queue->head + 2);
rawBodyLen = *(int32_t *)(queue->pBuffer + queue->head + 4);
} else {
rawHeadLen = *(int16_t *)(pQueue->pBuffer);
ftype = *(int8_t *)(pQueue->pBuffer + 2);
rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4);
rawHeadLen = *(int16_t *)(queue->pBuffer);
ftype = *(int8_t *)(queue->pBuffer + 2);
rawBodyLen = *(int32_t *)(queue->pBuffer + 4);
}
int16_t headLen = CEIL8(rawHeadLen);
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = (*mallocHeadFp)(headLen, RPC_QITEM);
void *pBody = (*mallocBodyFp)(bodyLen, RPC_QITEM);
void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
void *pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || pBody == NULL) {
taosThreadMutexUnlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
(*freeHeadFp)(pHead);
(*freeBodyFp)(pBody);
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
const int32_t pos = pQueue->head;
if (pQueue->head < pQueue->tail) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
pQueue->head = pQueue->head + 8 + headLen + bodyLen;
const int32_t pos = queue->head;
if (queue->head < queue->tail) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
queue->head = queue->head + 8 + headLen + bodyLen;
} else {
int32_t remain = pQueue->total - pQueue->head;
int32_t remain = queue->total - queue->head;
if (remain == 0) {
memcpy(pHead, pQueue->pBuffer + 8, headLen);
memcpy(pBody, pQueue->pBuffer + 8 + headLen, bodyLen);
pQueue->head = 8 + headLen + bodyLen;
memcpy(pHead, queue->pBuffer + 8, headLen);
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
queue->head = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pHead, pQueue->pBuffer, headLen);
memcpy(pBody, pQueue->pBuffer + headLen, bodyLen);
pQueue->head = headLen + bodyLen;
memcpy(pHead, queue->pBuffer, headLen);
memcpy(pBody, queue->pBuffer + headLen, bodyLen);
queue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
pQueue->head = headLen - (remain - 8) + bodyLen;
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
queue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
pQueue->head = bodyLen - (remain - 8 - headLen);
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
queue->head = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + headLen + 8, bodyLen);
pQueue->head = pQueue->head + headLen + bodyLen + 8;
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
queue->head = queue->head + headLen + bodyLen + 8;
}
}
pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
pQueue->items--;
taosThreadMutexUnlock(&pQueue->mutex);
queue->avail = queue->avail + headLen + bodyLen + 8;
queue->items--;
taosThreadMutexUnlock(&queue->mutex);
*ppHead = pHead;
*ppBody = pBody;
......@@ -311,191 +269,247 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
*pBodyLen = rawBodyLen;
*pFuncType = (EProcFuncType)ftype;
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
dTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", queue->name, pos, ftype, queue->items,
rawHeadLen, pHead, rawBodyLen, pBody);
return 1;
}
SProcObj *taosProcInit(const SProcCfg *pCfg) {
SProcObj *pProc = taosMemoryCalloc(1, sizeof(SProcObj));
if (pProc == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
SProc *proc = &pWrapper->proc;
proc->wrapper = pWrapper;
proc->name = pWrapper->name;
SShm *shm = &proc->shm;
int32_t cstart = 0;
int32_t csize = CEIL8(pCfg->shm.size / 2);
int32_t csize = CEIL8(shm->size / 2);
int32_t pstart = csize;
int32_t psize = CEIL8(pCfg->shm.size - pstart);
if (pstart + psize > pCfg->shm.size) {
int32_t psize = CEIL8(shm->size - pstart);
if (pstart + psize > shm->size) {
psize -= 8;
}
pProc->name = pCfg->name;
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
pProc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc);
return NULL;
proc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
proc->cqueue = dmInitProcQueue(proc, (char *)shm->ptr + cstart, csize);
proc->pqueue = dmInitProcQueue(proc, (char *)shm->ptr + pstart, psize);
if (proc->cqueue == NULL || proc->pqueue == NULL || proc->hash == NULL) {
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash);
return -1;
}
pProc->name = pCfg->name;
pProc->parent = pCfg->parent;
pProc->childMallocHeadFp = pCfg->childMallocHeadFp;
pProc->childFreeHeadFp = pCfg->childFreeHeadFp;
pProc->childMallocBodyFp = pCfg->childMallocBodyFp;
pProc->childFreeBodyFp = pCfg->childFreeBodyFp;
pProc->childConsumeFp = pCfg->childConsumeFp;
pProc->parentMallocHeadFp = pCfg->parentMallocHeadFp;
pProc->parentFreeHeadFp = pCfg->parentFreeHeadFp;
pProc->parentMallocBodyFp = pCfg->parentMallocBodyFp;
pProc->parentFreeBodyFp = pCfg->parentFreeBodyFp;
pProc->parentConsumeFp = pCfg->parentConsumeFp;
pProc->isChild = pCfg->isChild;
uDebug("proc:%s, is initialized, isChild:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild,
pProc->pChildQueue, pProc->pParentQueue);
return pProc;
dDebug("node:%s, proc is initialized, cqueue:%p pqueue:%p", proc->name, proc->cqueue, proc->pqueue);
return 0;
}
static void taosProcThreadLoop(SProcObj *pProc) {
void *pHead, *pBody;
int16_t headLen;
EProcFuncType ftype;
int32_t bodyLen;
SProcQueue *pQueue;
ProcConsumeFp consumeFp;
ProcMallocFp mallocHeadFp;
ProcFreeFp freeHeadFp;
ProcMallocFp mallocBodyFp;
ProcFreeFp freeBodyFp;
if (pProc->isChild) {
pQueue = pProc->pChildQueue;
consumeFp = pProc->childConsumeFp;
mallocHeadFp = pProc->childMallocHeadFp;
freeHeadFp = pProc->childFreeHeadFp;
mallocBodyFp = pProc->childMallocBodyFp;
freeBodyFp = pProc->childFreeBodyFp;
} else {
pQueue = pProc->pParentQueue;
consumeFp = pProc->parentConsumeFp;
mallocHeadFp = pProc->parentMallocHeadFp;
freeHeadFp = pProc->parentFreeHeadFp;
mallocBodyFp = pProc->parentMallocBodyFp;
freeBodyFp = pProc->parentFreeBodyFp;
}
static void *dmConsumChildQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = PROC_FUNC_REQ;
SNodeMsg *pReq = NULL;
dDebug("node:%s, start to consume from child queue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from child queue and exit thread", proc->name);
break;
}
if (numOfMsgs < 0) {
dError("node:%s, get no msg from child queue since %s", proc->name, terrstr());
taosMsleep(1);
continue;
}
if (ftype != PROC_FUNC_REQ) {
dFatal("node:%s, msg:%p from child queue, invalid ftype:%d", proc->name, pHead, ftype);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
} else {
dTrace("node:%s, msg:%p from child queue", proc->name, pHead);
pReq = pHead;
pReq->rpcMsg.pCont = pBody;
code = dmProcessNodeMsg(pWrapper, pReq);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into parent queue", proc->name, pReq, terrstr());
SRpcMsg rspMsg = {
.handle = pReq->rpcMsg.handle,
.ahandle = pReq->rpcMsg.ahandle,
.refId = pReq->rpcMsg.refId,
.pCont = pReq->pRsp,
.contLen = pReq->rspLen,
};
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, PROC_FUNC_RSP);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
rpcFreeCont(rspMsg.pCont);
}
}
} while (1);
uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread);
return NULL;
}
while (1) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
mallocBodyFp, freeBodyFp);
static void *dmConsumParentQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = PROC_FUNC_REQ;
SRpcMsg *pRsp = NULL;
dDebug("node:%s, start to consume from parent queue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
if (numOfMsgs == 0) {
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
dDebug("node:%s, get no msg from parent queue and exit thread", proc->name);
break;
} else if (numOfMsgs < 0) {
uError("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
}
if (numOfMsgs < 0) {
dError("node:%s, get no msg from parent queue since %s", proc->name, terrstr());
taosMsleep(1);
continue;
}
if (ftype == PROC_FUNC_RSP) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, rsp msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
dmRemoveProcRpcHandle(proc, pRsp->handle);
rpcSendResponse(pRsp);
} else if (ftype == PROC_FUNC_REGIST) {
pRsp = pHead;
dTrace("node:%s, regist msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle);
rpcRegisterBrokenLinkArg(pRsp);
rpcFreeCont(pBody);
} else if (ftype == PROC_FUNC_RELEASE) {
pRsp = pHead;
dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->handle);
dmRemoveProcRpcHandle(proc, pRsp->handle);
rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
} else {
(*consumeFp)(pProc->parent, pHead, headLen, pBody, bodyLen, ftype);
dFatal("node:%s, msg:%p get from parent queue, invalid ftype:%d", proc->name, pHead, ftype);
rpcFreeCont(pBody);
}
}
taosFreeQitem(pHead);
} while (1);
return NULL;
}
int32_t taosProcRun(SProcObj *pProc) {
TdThreadAttr thAttr;
int32_t dmRunProc(SProc *proc) {
TdThreadAttr thAttr = {0};
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pProc->thread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
return -1;
if (InParentProc(proc->ptype)) {
if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
return -1;
}
dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
}
if (InChildProc(proc->ptype)) {
if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
return -1;
}
dDebug("node:%s, thread:%" PRId64 " is created to consume cqueue", proc->name, proc->cthread);
}
uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread);
taosThreadAttrDestroy(&thAttr);
return 0;
}
void taosProcStop(SProcObj *pProc) {
if (!taosCheckPthreadValid(pProc->thread)) return;
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
SProcQueue *pQueue;
if (pProc->isChild) {
pQueue = pProc->pChildQueue;
} else {
pQueue = pProc->pParentQueue;
void dmStopProc(SProc *proc) {
if (taosCheckPthreadValid(proc->pthread)) {
dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
tsem_post(&proc->cqueue->sem);
taosThreadJoin(proc->pthread, NULL);
taosThreadClear(&proc->pthread);
}
tsem_post(&pQueue->sem);
taosThreadJoin(pProc->thread, NULL);
taosThreadClear(&pProc->thread);
}
void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) {
uDebug("proc:%s, start to clean up", pProc->name);
taosProcStop(pProc);
taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue);
if (pProc->hash != NULL) {
taosHashCleanup(pProc->hash);
pProc->hash = NULL;
}
uDebug("proc:%s, is cleaned up", pProc->name);
taosMemoryFree(pProc);
if (taosCheckPthreadValid(proc->cthread)) {
dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
tsem_post(&proc->pqueue->sem);
taosThreadJoin(proc->cthread, NULL);
taosThreadClear(&proc->cthread);
}
}
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t ref, EProcFuncType ftype) {
if (ftype != PROC_FUNC_REQ) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
SProc *proc = &pWrapper->proc;
dDebug("node:%s, start to clean up proc", pWrapper->name);
dmStopProc(proc);
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash);
dDebug("node:%s, proc is cleaned up", pWrapper->name);
}
int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex);
taosThreadMutexLock(&proc->cqueue->mutex);
int64_t *pRef = taosHashGet(pProc->hash, &h, sizeof(int64_t));
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
taosHashRemove(proc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&proc->cqueue->mutex);
return ref;
}
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
taosThreadMutexLock(&pProc->pChildQueue->mutex);
void *h = taosHashIterate(pProc->hash, NULL);
void dmCloseProcRpcHandles(SProc *proc) {
taosThreadMutexLock(&proc->cqueue->mutex);
void *h = taosHashIterate(proc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
(*HandleFp)(handle);
h = taosHashIterate(pProc->hash, h);
h = taosHashIterate(proc->hash, h);
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
}
taosHashClear(pProc->hash);
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
taosHashClear(proc->hash);
taosThreadMutexUnlock(&proc->cqueue->mutex);
}
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) {
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) {
int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
dWarn("node:%s, failed to put msg:%p to parent queue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
retry++;
taosMsleep(retry);
}
}
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t ref, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
}
......@@ -18,32 +18,28 @@
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
int32_t shmsize = tsMnodeShmSize;
if (pWrapper->nodeType == VNODE) {
if (pWrapper->ntype == VNODE) {
shmsize = tsVnodeShmSize;
} else if (pWrapper->nodeType == QNODE) {
} else if (pWrapper->ntype == QNODE) {
shmsize = tsQnodeShmSize;
} else if (pWrapper->nodeType == SNODE) {
} else if (pWrapper->ntype == SNODE) {
shmsize = tsSnodeShmSize;
} else if (pWrapper->nodeType == MNODE) {
} else if (pWrapper->ntype == MNODE) {
shmsize = tsMnodeShmSize;
} else if (pWrapper->nodeType == BNODE) {
} else if (pWrapper->ntype == BNODE) {
shmsize = tsBnodeShmSize;
} else {
return -1;
}
if (taosCreateShm(&pWrapper->procShm, pWrapper->nodeType, shmsize) != 0) {
if (taosCreateShm(&pWrapper->proc.shm, pWrapper->ntype, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->procShm.id, shmsize);
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize);
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = false;
pWrapper->procType = DND_PROC_PARENT;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
if (dmInitProc(pWrapper) != 0) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -51,10 +47,10 @@ static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
return 0;
}
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) {
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
char tstr[8] = {0};
char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n);
snprintf(tstr, sizeof(tstr), "%d", ntype);
args[1] = "-c";
args[2] = configDir;
args[3] = "-n";
......@@ -68,39 +64,20 @@ static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) {
return -1;
}
pWrapper->procId = pid;
pWrapper->proc.pid = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0;
}
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
if (pWrapper->pDnode->ntype == NODE_END) {
if (pWrapper->pDnode->rtype == NODE_END) {
dInfo("node:%s, should be started manually in child process", pWrapper->name);
} else {
if (dmNewNodeProc(pWrapper, pWrapper->nodeType) != 0) {
if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
return -1;
}
}
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dmInitChildProc(SMgmtWrapper *pWrapper) {
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = true;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dmRunChildProc(SMgmtWrapper *pWrapper) {
if (taosProcRun(pWrapper->procObj) != 0) {
if (dmRunProc(&pWrapper->proc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -119,25 +96,46 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
pInput->msgCb = dmGetMsgcb(pWrapper);
if (pWrapper->nodeType == DNODE || pWrapper->procType == DND_PROC_CHILD) {
if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) {
tmsgSetDefaultMsgCb(&pInput->msgCb);
}
if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) {
if (OnlyInSingleProc(pWrapper->proc.ptype)) {
if ((*pWrapper->func.openFp)(pInput, &output) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
if (pWrapper->procType == DND_PROC_CHILD) {
if (dmInitChildProc(pWrapper) != 0) return -1;
if (dmRunChildProc(pWrapper) != 0) return -1;
}
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
} else {
if (dmInitParentProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->procShm) != 0) return -1;
if (dmRunParentProc(pWrapper) != 0) return -1;
}
if (InChildProc(pWrapper->proc.ptype)) {
if ((*pWrapper->func.openFp)(pInput, &output) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
if (dmInitProc(pWrapper) != 0) {
return -1;
}
if (dmRunProc(&pWrapper->proc) != 0) {
return -1;
}
dDebug("node:%s, has been opened in child process", pWrapper->name);
pWrapper->deployed = true;
}
if (InParentProc(pWrapper->proc.ptype)) {
if (dmInitParentProc(pWrapper) != 0) {
return -1;
}
if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) {
return -1;
}
if (dmRunParentProc(pWrapper) != 0) {
return -1;
}
dDebug("node:%s, has been opened in parent process", pWrapper->name);
}
if (output.dnodeId != 0) {
......@@ -156,22 +154,11 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
if (!pWrapper->required) return 0;
if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
if (pWrapper->procType == DND_PROC_PARENT) {
dInfo("node:%s, not start in parent process", pWrapper->name);
} else if (pWrapper->procType == DND_PROC_CHILD) {
dInfo("node:%s, start in child process", pWrapper->name);
if (pWrapper->nodeType != DNODE) {
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
} else {
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
dmReportStartup(pWrapper->pDnode, pWrapper->name, "started");
......@@ -193,13 +180,14 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
taosMsleep(10);
}
if (pWrapper->procType == DND_PROC_PARENT) {
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
if (OnlyInParentProc(pWrapper->proc.ptype)) {
int32_t pid = pWrapper->proc.pid;
if (pid > 0 && taosProcExist(pid)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid);
taosKillProc(pid);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pid);
taosWaitProc(pid);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pid);
}
}
......@@ -210,9 +198,8 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
}
taosWUnLockLatch(&pWrapper->latch);
if (pWrapper->procObj) {
taosProcCleanup(pWrapper->procObj);
pWrapper->procObj = NULL;
if (!OnlyInSingleProc(pWrapper->proc.ptype)) {
dmCleanupProc(pWrapper);
}
dInfo("node:%s, has been closed", pWrapper->name);
......@@ -222,27 +209,8 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (!pWrapper->required) continue;
if (ntype == DNODE) {
pWrapper->procType = DND_PROC_SINGLE;
if (dmOpenNode(pWrapper) != 0) {
return -1;
}
} else {
if (pDnode->ptype == DND_PROC_CHILD) {
if (pDnode->ntype == ntype) {
pWrapper->procType = DND_PROC_CHILD;
if (dmOpenNode(pWrapper) != 0) {
return -1;
}
} else {
pWrapper->required = false;
}
} else {
pWrapper->procType = pDnode->ptype;
if (dmOpenNode(pWrapper) != 0) {
return -1;
}
}
if (dmOpenNode(pWrapper) != 0) {
return -1;
}
}
......@@ -253,7 +221,7 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
static int32_t dmStartNodes(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (ntype == DNODE && (pDnode->ptype == DND_PROC_CHILD || pDnode->ptype == DND_PROC_TEST)) continue;
if (ntype == DNODE && (InChildProc(pDnode->ptype) || !OnlyInTestProc(pDnode->ptype))) continue;
if (dmStartNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
......@@ -279,57 +247,28 @@ static void dmCloseNodes(SDnode *pDnode) {
}
}
static void dmProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
}
static void dmWatchNodes(SDnode *pDnode) {
if (pDnode->ptype != DND_PROC_PARENT) return;
if (pDnode->ntype == NODE_END) return;
if (!InParentProc(pDnode->ptype)) return;
if (pDnode->rtype == NODE_END) return;
taosThreadMutexLock(&pDnode->mutex);
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SProc *proc = &pWrapper->proc;
if (!pWrapper->required) continue;
if (pWrapper->procType != DND_PROC_PARENT) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
if (pWrapper->procObj) {
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
}
dmNewNodeProc(pWrapper, n);
if (!InParentProc(proc->ptype)) continue;
if (proc->pid <= 0 || !taosProcExist(proc->pid)) {
dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid);
dmCloseProcRpcHandles(&pWrapper->proc);
dmNewNodeProc(pWrapper, ntype);
}
}
taosThreadMutexUnlock(&pDnode->mutex);
}
int32_t dmRun(SDnode *pDnode) {
if (tsMultiProcess == 0) {
pDnode->ptype = DND_PROC_SINGLE;
dInfo("dnode run in single process mode");
} else if (tsMultiProcess == 2) {
pDnode->ptype = DND_PROC_TEST;
dInfo("dnode run in multi-process test mode");
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->ptype = DND_PROC_PARENT;
dInfo("dnode run in parent process mode");
} else {
pDnode->ptype = DND_PROC_CHILD;
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process mode", pWrapper->name);
}
if (pDnode->ptype != DND_PROC_CHILD) {
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
}
if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr());
return -1;
......@@ -341,15 +280,15 @@ int32_t dmRun(SDnode *pDnode) {
}
while (1) {
taosMsleep(100);
if (pDnode->event & DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED);
dmStopNodes(pDnode);
dmCloseNodes(pDnode);
return 0;
} else {
dmWatchNodes(pDnode);
}
dmWatchNodes(pDnode);
taosMsleep(100);
}
}
......@@ -48,7 +48,7 @@ static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return msgFp;
}
static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
static inline int32_t dmBuildNodeMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
......@@ -67,10 +67,26 @@ static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
return 0;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
if (InParentProc(pWrapper->proc.ptype)) {
dTrace("msg:%p, created and put into child queue, type:%s handle:%p user:%s code:0x%04x contLen:%d", pMsg,
TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user, pRpc->code & 0XFFFF, pRpc->contLen);
return dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
((pRpc->msgType & 1U) && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId,
PROC_FUNC_REQ);
} else {
dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user);
NodeMsgFp msgFp = dmGetMsgFp(pWrapper, &pMsg->rpcMsg);
if (msgFp == NULL) return -1;
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
}
static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t code = -1;
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
uint16_t msgType = pRpc->msgType;
bool needRelease = false;
bool isReq = msgType & 1U;
......@@ -78,23 +94,14 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
needRelease = true;
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER;
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (dmBuildNodeMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType != DND_PROC_PARENT) {
dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else {
dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg,
TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
(isReq && (pMsg->rpcMsg.code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
}
code = dmProcessNodeMsg(pWrapper, pMsg);
_OVER:
if (code == 0) {
if (pWrapper->procType == DND_PROC_PARENT) {
if (InParentProc(pWrapper->proc.ptype)) {
dTrace("msg:%p, freed in parent process", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
......@@ -291,17 +298,15 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR
}
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType != DND_PROC_CHILD) {
if (!InChildProc(pWrapper->proc.ptype)) {
dmSendRpcRsp(pWrapper->pDnode, pRsp);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
ASSERT(pRsp->pCont == NULL);
if (pWrapper->procType != DND_PROC_CHILD) {
if (!InChildProc(pWrapper->proc.ptype)) {
SRpcMsg resp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
......@@ -314,97 +319,25 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp
resp.refId = pRsp->refId;
rpcSendResponse(&resp);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != DND_PROC_CHILD) {
if (!InChildProc(pWrapper->proc.ptype)) {
rpcRegisterBrokenLinkArg(pMsg);
} else {
taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
}
}
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
if (pWrapper->procType != DND_PROC_CHILD) {
if (!InChildProc(pWrapper->proc.ptype)) {
rpcReleaseHandle(handle, type);
} else {
SRpcMsg msg = {.handle = handle, .code = type};
taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
}
}
static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
EProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont;
dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg);
if (code != 0) {
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno, .refId = pRpc->refId};
dmSendRsp(pWrapper, &rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pCont);
}
}
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
EProcFuncType ftype) {
int32_t code = pMsg->code & 0xFFFF;
pMsg->pCont = pCont;
if (ftype == PROC_FUNC_REQ) {
ASSERT(1);
dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
pMsg->handle, code, pMsg->ahandle);
dmSendReq(pWrapper, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
} else if (ftype == PROC_FUNC_RSP) {
dTrace("msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code, pMsg->ahandle);
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg);
} else if (ftype == PROC_FUNC_REGIST) {
dTrace("msg:%p, get from parent queue, regist handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
pMsg->ahandle);
rpcRegisterBrokenLinkArg(pMsg);
} else if (ftype == PROC_FUNC_RELEASE) {
dTrace("msg:%p, get from parent queue, release handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
pMsg->ahandle);
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont);
} else {
dError("msg:%p, invalid ftype:%d while get from parent queue, handle:%p", pMsg, ftype, pMsg->handle);
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
}
taosMemoryFree(pMsg);
}
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
SProcCfg cfg = {
.childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->procShm,
.parent = pWrapper,
.name = pWrapper->name,
};
return cfg;
}
static bool rpcRfp(int32_t code) {
......
......@@ -26,7 +26,6 @@
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tprocess.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
......@@ -83,7 +82,7 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
typedef bool (*IsNodeDeployedFp)(struct SDnode *pDnode, EDndNodeType ntype);
typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype);
typedef struct {
const char *path;
......@@ -103,7 +102,7 @@ typedef struct {
struct SDnode *pDnode;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeDeployedFp isNodeDeployedFp;
IsNodeRequiredFp isNodeRequiredFp;
} SMgmtInputOpt;
typedef struct {
......
......@@ -46,12 +46,12 @@ target_link_libraries(freelistTest os util gtest gtest_main)
# target_link_libraries(encodeTest os util gtest gtest_main)
# queueTest
add_executable(procTest "procTest.cpp")
target_link_libraries(procTest os util transport sut gtest_main)
add_test(
NAME procTest
COMMAND procTest
)
# add_executable(procTest "procTest.cpp")
# target_link_libraries(procTest os util transport sut gtest_main)
# add_test(
# NAME procTest
# COMMAND procTest
# )
# cfgTest
add_executable(cfgTest "cfgTest.cpp")
......
......@@ -76,16 +76,16 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) {
shm,
&shm,
"1234"};
SProcObj *proc = taosProcInit(&cfg);
SProc *proc = dmInitProc(&cfg);
ASSERT_EQ(proc, nullptr);
shm.size = 2468;
cfg.shm = shm;
proc = taosProcInit(&cfg);
proc = dmInitProc(&cfg);
ASSERT_NE(proc, nullptr);
ASSERT_EQ(taosProcRun(proc), 0);
taosProcCleanup(proc);
ASSERT_EQ(dmRunProc(proc), 0);
dmCleanupProc(proc);
taosDropShm(&shm);
}
......@@ -117,33 +117,33 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
shm,
(void *)((int64_t)1235),
"1235_c"};
SProcObj *cproc = taosProcInit(&cfg);
SProc *cproc = dmInitProc(&cfg);
ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
}
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
cfg.isChild = true;
cfg.name = "1235_p";
SProcObj *pproc = taosProcInit(&cfg);
SProc *pproc = dmInitProc(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
dmRunProc(pproc);
dmCleanupProc(pproc);
}
taosProcCleanup(cproc);
dmCleanupProc(cproc);
taosDropShm(&shm);
}
......@@ -175,26 +175,26 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
shm,
(void *)((int64_t)1236),
"1236_c"};
SProcObj *cproc = taosProcInit(&cfg);
SProc *cproc = dmInitProc(&cfg);
ASSERT_NE(cproc, nullptr);
cfg.name = "1236_p";
cfg.isChild = true;
SProcObj *pproc = taosProcInit(&cfg);
SProc *pproc = dmInitProc(&cfg);
ASSERT_NE(pproc, nullptr);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ);
dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ);
}
taosProcRun(cproc);
taosProcStop(cproc);
dmRunProc(cproc);
dmStopProc(cproc);
}
taosProcCleanup(pproc);
taosProcCleanup(cproc);
dmCleanupProc(pproc);
dmCleanupProc(cproc);
taosDropShm(&shm);
}
......@@ -229,34 +229,34 @@ TEST_F(UtilTesProc, 03_Handle) {
shm,
(void *)((int64_t)1235),
"1237_p"};
SProcObj *cproc = taosProcInit(&cfg);
SProc *cproc = dmInitProc(&cfg);
ASSERT_NE(cproc, nullptr);
for (int32_t j = 0; j < 1; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0);
ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0);
}
cfg.isChild = true;
cfg.name = "child_queue";
SProcObj *pproc = taosProcInit(&cfg);
SProc *pproc = dmInitProc(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
dmRunProc(pproc);
dmCleanupProc(pproc);
int64_t ref = 0;
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3));
ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)3));
EXPECT_EQ(ref, 3);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5));
ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)5));
EXPECT_EQ(ref, 5);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)6));
ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)6));
EXPECT_EQ(ref, 6);
taosProcCloseHandles(cproc, processHandle);
dmCloseProcRpcHandles(cproc, processHandle);
}
taosProcCleanup(cproc);
dmCleanupProc(cproc);
taosDropShm(&shm);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册