提交 846129fd 编写于 作者: S Shengliang Guan

shm

上级 da145d67
......@@ -29,7 +29,6 @@ typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SSnode SSnode;
typedef struct {
int64_t numOfErrors;
} SSnodeLoad;
typedef struct {
......@@ -73,20 +72,9 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
* @param pSnode The snode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int32_t 0 for success, -1 for failure
*/
// int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
/**
* @brief Drop a snode.
*
* @param path Path of the snode.
*/
void sndDestroy(const char *path);
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -65,7 +65,7 @@ int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
return dndWriteMsgToWorker(pWorker, pMsg);
}
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
......
......@@ -163,7 +163,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
......
......@@ -21,8 +21,8 @@
#include "bm.h"
#include "dm.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"
#include "qm.h"
#include "sm.h"
#include "vm.h"
#ifdef __cplusplus
......
......@@ -154,7 +154,7 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
case TDMT_DND_CREATE_SNODE:
return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
case TDMT_DND_DROP_SNODE:
return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
return dndProcessDropNodeMsg(pDnode, SNODE, pMsg);
case TDMT_DND_CREATE_BNODE:
return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
case TDMT_DND_DROP_BNODE:
......
......@@ -81,31 +81,13 @@ void dndCleanupWorker(SDnodeWorker *pWorker) {
}
}
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) {
if (pWorker == NULL || pWorker->queue == NULL ) {
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) {
if (pWorker == NULL || pWorker->queue == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
void *pMsg = NULL;
if (contLen != 0) {
pMsg = taosAllocateQitem(contLen);
if (pMsg != NULL) {
memcpy(pMsg, pCont, contLen);
}
} else {
pMsg = pCont;
}
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
if (contLen != 0) {
taosFreeQitem(pMsg);
}
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -17,8 +17,8 @@
#include "bm.h"
#include "dmInt.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"
#include "qm.h"
#include "sm.h"
#include "vm.h"
static void *dmThreadRoutine(void *param) {
......@@ -136,5 +136,5 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
return dndWriteMsgToWorker(pWorker, pMsg);
}
......@@ -85,7 +85,7 @@ static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeM
if (pMnode == NULL) return -1;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
mmRelease(pMgmt, pMnode);
return code;
}
......
......@@ -43,14 +43,14 @@ int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->queryWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
return dndWriteMsgToWorker(pWorker, pMsg);
}
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->fetchWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
return dndWriteMsgToWorker(pWorker, pMsg);
}
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
......
......@@ -22,7 +22,7 @@
extern "C" {
#endif
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
void smGetMgmtFp(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
......
......@@ -43,10 +43,11 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// smWorker.c
int32_t smStartWorker(SQnodeMgmt *pMgmt);
void smStopWorker(SQnodeMgmt *pMgmt);
int32_t smProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smStartWorker(SSnodeMgmt *pMgmt);
void smStopWorker(SSnodeMgmt *pMgmt);
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -16,27 +16,54 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg);
dTrace("msg:%p, will be processed in snode unique queue", pMsg);
sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg);
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, pMsg);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
if (pMgmt->uniqueWorkers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
if (pUniqueWorker == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) {
if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) {
dError("failed to start snode unique worker since %s", terrstr());
return -1;
}
taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker);
if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
SND_SHARED_THREAD_NUM, dndProcessSnodeSharedQueue)) {
if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
SND_SHARED_THREAD_NUM, smProcessSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr());
return -1;
}
......@@ -44,156 +71,47 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) {
return 0;
}
static void dndStopSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 0) {
taosMsleep(10);
}
void smStopWorker(SSnodeMgmt *pMgmt) {
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
dndCleanupWorker(worker);
}
taosArrayDestroy(pMgmt->uniqueWorkers);
dndCleanupWorker(&pMgmt->sharedWorker);
}
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
sndProcessUMsg(pSnode, pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
dndReleaseSnode(pDnode, pSnode);
} else {
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
}
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
sndProcessSMsg(pSnode, pMsg);
dndReleaseSnode(pDnode, pSnode);
} else {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
}
#if 0
if (pMsg->msgType & 1u) {
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp);
free(pRsp);
} else {
if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
}
}
#endif
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM;
}
static void dndWriteSnodeMsgToWorkerByMsg(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t index = dndGetSWIdFromMsg(pMsg);
SDnodeWorker *pWorker = taosArrayGetP(pDnode->smgmt.uniqueWorkers, index);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
static void dndWriteSnodeMsgToMgmtWorker(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, 0);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
}
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
}
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->sharedWorker;
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
}
......@@ -303,5 +303,5 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
return dndWriteMsgToWorker(pWorker, pMsg);
}
\ No newline at end of file
......@@ -38,13 +38,6 @@ void sndClose(SSnode *pSnode) {
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
/*int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {*/
/**pRsp = NULL;*/
/*return 0;*/
/*}*/
void sndDestroy(const char *path) {}
SStreamMeta *sndMetaNew() {
SStreamMeta *pMeta = calloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
......@@ -93,7 +86,7 @@ static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
// operator exec
......@@ -101,7 +94,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask *pTask = malloc(sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
ASSERT(0);
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);
......@@ -114,15 +107,13 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
} else {
ASSERT(0);
}
return 0;
}
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec
if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
sndProcessTaskExecReq(pSnode, pMsg);
} else {
ASSERT(0);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册