未验证 提交 060149fa 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11192 from taosdata/feature/shm

add multiprocess mode to CI
......@@ -50,7 +50,6 @@ typedef struct {
PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
......@@ -60,7 +59,6 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(void* handle, int8_t type);
......
......@@ -29,8 +29,7 @@ extern "C" {
typedef struct SMnode SMnode;
typedef struct {
int32_t dnodeId;
int64_t clusterId;
bool deploy;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......
......@@ -26,7 +26,7 @@ typedef struct {
void* ptr;
} SShm;
int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ;
int32_t taosCreateShm(SShm *pShm, int32_t key, int32_t shmsize) ;
void taosDropShm(SShm *pShm);
int32_t taosAttachShm(SShm *pShm);
......
......@@ -22,7 +22,7 @@
extern "C" {
#endif
typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType;
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj;
......@@ -53,7 +53,7 @@ void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
#ifdef __cplusplus
......
......@@ -32,10 +32,6 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
}
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
}
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
......
......@@ -19,12 +19,7 @@
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
pOption->msgCb = msgCb;
}
......
......@@ -30,7 +30,6 @@ static struct {
} global = {0};
static void dndStopDnode(int signum, void *info, void *ctx) {
dInfo("system signal:%d received", signum);
SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode);
if (pDnode != NULL) {
dndHandleEvent(pDnode, DND_EVENT_STOP);
......@@ -41,8 +40,10 @@ static void dndSetSignalHandle() {
taosSetSignal(SIGTERM, dndStopDnode);
taosSetSignal(SIGHUP, dndStopDnode);
taosSetSignal(SIGINT, dndStopDnode);
taosSetSignal(SIGTSTP, dndStopDnode);
taosSetSignal(SIGABRT, dndStopDnode);
taosSetSignal(SIGBREAK, dndStopDnode);
taosSetSignal(SIGQUIT, dndStopDnode);
if (!tsMultiProcess) {
} else if (global.ntype == DNODE || global.ntype == NODE_MAX) {
......@@ -72,7 +73,7 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
} else if (strcmp(argv[i], "-n") == 0) {
global.ntype = atoi(argv[++i]);
if (global.ntype <= DNODE || global.ntype > NODE_MAX) {
printf("'-n' range is [1-5], default is 0\n");
printf("'-n' range is [1 - %d], default is 0\n", NODE_MAX - 1);
return -1;
}
} else if (strcmp(argv[i], "-k") == 0) {
......
......@@ -149,7 +149,7 @@ int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
......
......@@ -45,7 +45,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
}
void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, start to close", pWrapper->name);
dDebug("node:%s, mgmt start to close", pWrapper->name);
pWrapper->required = false;
taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
......@@ -62,7 +62,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL;
}
dDebug("node:%s, has been closed", pWrapper->name);
dDebug("node:%s, mgmt has been closed", pWrapper->name);
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
......@@ -90,10 +90,10 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) {
pMsg->pCont = pCont;
dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);
switch (ftype) {
case PROC_REG:
case PROC_REGIST:
rpcRegisterBrokenLinkArg(pMsg);
break;
case PROC_RELEASE:
......@@ -101,11 +101,14 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
rpcFreeCont(pCont);
break;
case PROC_REQ:
// todo send to dnode
dndSendReqToMnode(pWrapper, pMsg);
default:
// dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break;
case PROC_RSP:
dndSendRpcRsp(pWrapper, pMsg);
break;
default:
break;
}
taosMemoryFree(pMsg);
}
......@@ -180,6 +183,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dndSetStatus(pDnode, DND_STAT_STOPPED);
break;
}
taosMsleep(100);
......@@ -202,7 +206,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (!pWrapper->required) continue;
int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item
if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
......@@ -255,6 +259,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
......@@ -263,15 +269,6 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
}
}
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue;
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
......@@ -331,6 +328,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
}
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
......@@ -340,11 +339,14 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dInfo("%s is about to stop", pWrapper->name);
dndSetStatus(pDnode, DND_STAT_STOPPED);
break;
}
taosMsleep(100);
}
return 0;
}
int32_t dndRun(SDnode *pDnode) {
......
......@@ -133,14 +133,6 @@ _OVER:
void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return;
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down, data:%p", pDnode);
return;
}
dInfo("start to close dnode, data:%p", pDnode);
dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
......@@ -151,7 +143,6 @@ void dndClose(SDnode *pDnode) {
}
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode);
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
......
......@@ -319,22 +319,6 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
return 0;
}
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->procType != PROC_CHILD) {
SDnode *pDnode = pWrapper->pDnode;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
terrno = TSDB_CODE_DND_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
return -1;
}
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) {
taosMsleep(1);
}
}
}
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans;
......@@ -362,13 +346,37 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
}
}
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
terrno = TSDB_CODE_DND_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
return -1;
}
if (pWrapper->procType != PROC_CHILD) {
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
} else {
int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet);
char *pHead = taosMemoryMalloc(headLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pHead, pReq, sizeof(SRpcMsg));
memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ);
taosMemoryFree(pHead);
return 0;
}
}
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType != PROC_CHILD) {
dndSendRpcRsp(pWrapper, pRsp);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) {
taosMsleep(1);
}
taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
}
}
......@@ -376,9 +384,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) {
taosMsleep(1);
}
taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
}
}
......@@ -387,20 +393,17 @@ static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type)
rpcReleaseHandle(handle, type);
} else {
SRpcMsg msg = {.handle = handle, .code = type};
while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) {
taosMsleep(1);
}
taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
}
}
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.pWrapper = pWrapper,
.sendReqFp = dndSendReq,
.sendRspFp = dndSendRsp,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle,
.sendMnodeReqFp = dndSendReqToMnode,
.sendReqFp = dndSendReqToDnode,
.sendRspFp = dndSendRsp,
};
return msgCb;
}
\ No newline at end of file
......@@ -111,7 +111,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);;
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr());
return -1;
}
......@@ -145,7 +145,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);;
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr());
return -1;
}
......
......@@ -39,20 +39,11 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
}
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb;
}
......@@ -66,6 +57,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pReplica->id = 1;
pReplica->port = pDnode->serverPort;
tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN);
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
......@@ -77,6 +69,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pOption->deploy = false;
}
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
......@@ -89,7 +82,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
if (pReplica->id == pMgmt->pDnode->dnodeId) {
pOption->selfIndex = i;
}
}
......@@ -98,6 +91,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
dError("failed to build mnode options since %s", terrstr());
return -1;
}
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
......@@ -225,9 +219,7 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
return code;
}
static int32_t mmOpen(SMgmtWrapper *pWrapper) {
return mmOpenFromMsg(pWrapper, NULL);
}
static int32_t mmOpen(SMgmtWrapper *pWrapper) { return mmOpenFromMsg(pWrapper, NULL); }
static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode-mgmt start to run");
......@@ -258,7 +250,7 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry
}
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
}
......@@ -19,15 +19,10 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb;
}
......
......@@ -19,12 +19,7 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
pOption->msgCb = msgCb;
}
......
......@@ -70,46 +70,6 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED);
}
}
{
SCreateVnodeReq createReq = {0};
createReq.vgId = 2;
createReq.dnodeId = 3;
strcpy(createReq.db, "1.d1");
createReq.dbUid = 9527;
createReq.vgVersion = 1;
createReq.cacheBlockSize = 16;
createReq.totalBlocks = 10;
createReq.daysPerFile = 10;
createReq.daysToKeep0 = 3650;
createReq.daysToKeep1 = 3650;
createReq.daysToKeep2 = 3650;
createReq.minRows = 100;
createReq.minRows = 4096;
createReq.commitTime = 3600;
createReq.fsyncPeriod = 3000;
createReq.walLevel = 1;
createReq.precision = 0;
createReq.compression = 2;
createReq.replica = 1;
createReq.quorum = 1;
createReq.update = 0;
createReq.cacheLastRow = 0;
createReq.selfIndex = 0;
for (int r = 0; r < createReq.replica; ++r) {
SReplica* pReplica = &createReq.replicas[r];
pReplica->id = 1;
pReplica->port = 9527;
}
int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_INVALID_OPTION);
}
}
TEST_F(DndTestVnode, 02_Alter_Vnode) {
......
......@@ -128,16 +128,12 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SMsgCb msgCb = {0};
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
......
......@@ -68,12 +68,6 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
if (createReq.dnodeId != pMgmt->pDnode->dnodeId) {
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist", createReq.vgId);
......@@ -82,16 +76,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
SMsgCb msgCb = {0};
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
vnodeCfg.msgCb = msgCb;
vnodeCfg.pTfs = pMgmt->pTfs;
......
......@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
#ifdef __cplusplus
}
......
......@@ -100,7 +100,6 @@ typedef struct {
} SGrantInfo;
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
......
......@@ -17,7 +17,7 @@
#include "mndCluster.h"
#include "mndShow.h"
#define TSDB_CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_RESERVE_SIZE 64
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
......@@ -61,6 +61,23 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
return 0;
}
int64_t mndGetClusterId(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t clusterId = -1;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
clusterId = pCluster->id;
sdbRelease(pSdb, pCluster);
}
return clusterId;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -433,12 +433,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
pHeartbeat->connId = htonl(pHeartbeat->connId);
pHeartbeat->pid = htonl(pHeartbeat->pid);
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1;
}
SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
......
......@@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) {
DROP_QNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
}
mndReleaseQnode(pMnode, pObj);
......
......@@ -433,7 +433,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
DROP_SNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
}
mndReleaseSnode(pMnode, pObj);
......
......@@ -187,7 +187,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
return 0;
}
static int32_t mndInitSteps(SMnode *pMnode) {
static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
......@@ -210,7 +210,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (pMnode->clusterId <= 0) {
if (deploy) {
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
} else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
......@@ -263,23 +263,15 @@ static int32_t mndExecSteps(SMnode *pMnode) {
}
}
pMnode->clusterId = mndGetClusterId(pMnode);
return 0;
}
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb;
if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
}
return 0;
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
......@@ -294,6 +286,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
mndSetOptions(pMnode, pOption);
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) {
......@@ -312,16 +305,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL;
}
code = mndSetOptions(pMnode, pOption);
if (code != 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndInitSteps(pMnode);
code = mndInitSteps(pMnode, pOption->deploy);
if (code != 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
......
......@@ -23,9 +23,9 @@ int32_t taosNewProc(char **args) {
int32_t pid = fork();
if (pid == 0) {
args[0] = tsProcPath;
close(STDIN_FILENO);
// close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
// close(STDERR_FILENO);
return execvp(tsProcPath, args);
} else {
return pid;
......@@ -33,7 +33,7 @@ int32_t taosNewProc(char **args) {
}
void taosWaitProc(int32_t pid) {
int32_t status = 0;
int32_t status = -1;
waitpid(pid, &status, 0);
}
......
......@@ -17,10 +17,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
int32_t taosCreateShm(SShm* pShm, int32_t shmsize) {
int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) {
pShm->id = -1;
int32_t shmid = shmget(0X95279527, shmsize, IPC_CREAT | 0600);
int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600);
if (shmid < 0) {
return -1;
}
......
......@@ -59,7 +59,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp) {
struct sigaction act;
memset(&act, 0, sizeof(act));
#if 1
act.sa_flags = SA_SIGINFO;
act.sa_flags = SA_SIGINFO | SA_RESTART;
act.sa_sigaction = (FLinuxSignalHandler)sigfp;
#else
act.sa_handler = sigfp;
......
......@@ -434,7 +434,9 @@ int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen,
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
}
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype);
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) {
taosMsleep(1);
}
}
......@@ -52,4 +52,10 @@
./test.sh -f tsim/stable/values.sim
./test.sh -f tsim/stable/vnode3.sim
# --- for multi process mode
./test.sh -f tsim/user/basic1.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/tmq/basic.sim -m
#======================b1-end===============
......@@ -5,18 +5,12 @@ set +e
echo "Executing deploy.sh"
if [ $# != 4 ]; then
echo "argument list need input : "
echo " -n nodeName"
echo " -i nodePort"
exit 1
fi
UNAME_BIN=`which uname`
OS_TYPE=`$UNAME_BIN`
NODE_NAME=
NODE=
while getopts "n:i:" arg
MULTIPROCESS=0
while getopts "n:i:m" arg
do
case $arg in
n)
......@@ -25,6 +19,9 @@ do
i)
NODE=$OPTARG
;;
m)
MULTIPROCESS=1
;;
?)
echo "unkonw argument"
;;
......@@ -145,5 +142,5 @@ echo "statusInterval 1" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "telemetryReporting 0" >> $TAOS_CFG
echo "multiProcess 0" >> $TAOS_CFG
echo "multiProcess ${MULTIPROCESS}" >> $TAOS_CFG
echo " " >> $TAOS_CFG
......@@ -7,7 +7,6 @@
##################################################
set +e
#set -x
FILE_NAME=
RELEASE=0
......@@ -16,7 +15,8 @@ VALGRIND=0
UNIQUE=0
UNAME_BIN=`which uname`
OS_TYPE=`$UNAME_BIN`
while getopts "f:avu" arg
MULTIPROCESS=0
while getopts "f:avum" arg
do
case $arg in
f)
......@@ -28,6 +28,9 @@ do
u)
UNIQUE=1
;;
m)
MULTIPROCESS=1
;;
?)
echo "unknow argument"
;;
......@@ -125,8 +128,13 @@ if [ -n "$FILE_NAME" ]; then
echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
else
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME
$PROGRAM -c $CFG_DIR -f $FILE_NAME
if [[ $MULTIPROCESS -eq 1 ]];then
echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME
$PROGRAM -m -c $CFG_DIR -f $FILE_NAME
else
echo "ExcuteCmd(singleprocess):" $PROGRAM -c $CFG_DIR -f $FILE_NAME
$PROGRAM -c $CFG_DIR -f $FILE_NAME
fi
fi
else
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim
......
......@@ -155,6 +155,7 @@ extern int32_t simScriptSucced;
extern int32_t simDebugFlag;
extern char simScriptDir[];
extern bool abortExecution;
extern bool useMultiProcess;
SScript *simParseScript(char *fileName);
SScript *simProcessCallOver(SScript *script);
......
......@@ -305,25 +305,24 @@ bool simExecuteRunBackCmd(SScript *script, char *option) {
return true;
}
void simReplaceShToBat(char *dst) {
char *sh = strstr(dst, ".sh");
if (sh != NULL) {
void simReplaceStr(char *buf, char *src, char *dst) {
char *begin = strstr(buf, src);
if (begin != NULL) {
int32_t srcLen = (int32_t)strlen(src);
int32_t dstLen = (int32_t)strlen(dst);
char *end = dst + dstLen;
*(end + 1) = 0;
int32_t interval = (dstLen - srcLen);
int32_t remainLen = (int32_t)strlen(buf);
char *end = buf + remainLen;
*(end + interval) = 0;
for (char *p = end; p >= sh; p--) {
*(p + 1) = *p;
for (char *p = end; p >= begin; p--) {
*(p + interval) = *p;
}
sh[0] = '.';
sh[1] = 'b';
sh[2] = 'a';
sh[3] = 't';
sh[4] = ' ';
memcpy(begin, dst, dstLen);
}
simDebug("system cmd is %s", dst);
simInfo("system cmd is %s", buf);
}
bool simExecuteSystemCmd(SScript *script, char *option) {
......@@ -334,9 +333,13 @@ bool simExecuteSystemCmd(SScript *script, char *option) {
simVisuallizeOption(script, option, buf + strlen(buf));
#else
sprintf(buf, "%s%s", simScriptDir, option);
simReplaceShToBat(buf);
simReplaceStr(buf, ".sh", ".bat");
#endif
if (useMultiProcess) {
simReplaceStr(buf, "deploy.sh", "deploy.sh -m");
}
simLogSql(buf, true);
int32_t code = system(buf);
int32_t repeatTimes = 0;
......
......@@ -18,6 +18,7 @@
bool simExecSuccess = false;
bool abortExecution = false;
bool useMultiProcess = false;
void simHandleSignal(int32_t signo, void *sigInfo, void *context) {
simSystemCleanUp();
......@@ -32,6 +33,8 @@ int32_t main(int32_t argc, char *argv[]) {
tstrncpy(configDir, argv[++i], 128);
} else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) {
strcpy(scriptFile, argv[++i]);
} else if (strcmp(argv[i], "-m") == 0) {
useMultiProcess = true;
} else {
printf("usage: %s [options] \n", argv[0]);
printf(" [-c config]: config directory, default is: %s\n", configDir);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册