提交 36d4ef0a 编写于 作者: L Liu Jicong

refactor stream worker

上级 00c5307d
......@@ -1118,6 +1118,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
int64_t streamId;
char* sql;
char* executorMsg;
} SMVCreateStreamReq, SMSCreateStreamReq;
typedef struct {
int64_t streamId;
} SMVCreateStreamRsp, SMSCreateStreamRsp;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igExists;
......
......@@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
*/
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
/**
* @brief Drop a snode.
*
......
......@@ -90,9 +90,11 @@ typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
int8_t uniqueWorkerInUse;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
SArray *uniqueWorkers; // SArray<SDnodeWorker*>
SDnodeWorker sharedWorker;
} SSnodeMgmt;
typedef struct {
......@@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo);
}
#endif
#endif /*_TD_DND_ENV_H_*/
\ No newline at end of file
#endif /*_TD_DND_ENV_H_*/
......@@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
}
#endif
#endif /*_TD_DND_INT_H_*/
\ No newline at end of file
#endif /*_TD_DND_INT_H_*/
......@@ -19,7 +19,20 @@
#include "dndTransport.h"
#include "dndWorker.h"
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
typedef struct {
int32_t vgId;
int32_t refCount;
int32_t snVersion;
int8_t dropped;
char *path;
SSnode *pImpl;
STaosQueue *pSharedQ;
STaosQueue *pUniqueQ;
} SSnodeObj;
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
static SSnode *dndAcquireSnode(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
......@@ -152,8 +165,18 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) {
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) {
dError("failed to start snode write worker since %s", terrstr());
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
for (int32_t i = 0; i < 2; i++) {
SDnodeWorker uniqueWorker;
if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) {
dError("failed to start snode unique worker since %s", terrstr());
return -1;
}
taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker);
}
if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4,
dndProcessSnodeSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr());
return -1;
}
......@@ -169,9 +192,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
while (pMgmt->refCount > 0) {
taosMsleep(10);
}
}
dndCleanupWorker(&pMgmt->writeWorker);
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
dndCleanupWorker(worker);
}
taosArrayDestroy(pMgmt->uniqueWorkers);
}
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
......@@ -292,17 +319,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
}
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
SRpcMsg *pRsp = NULL;
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
code = sndProcessMsg(pSnode, pMsg, &pRsp);
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
sndProcessUMsg(pSnode, pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
dndReleaseSnode(pDnode, pSnode);
}
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = &pDnode->smgmt;
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
code = sndProcessSMsg(pSnode, pMsg);
}
dndReleaseSnode(pDnode, pSnode);
#if 0
if (pMsg->msgType & 1u) {
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
......@@ -314,11 +360,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendResponse(&rpcRsp);
}
}
#endif
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers);
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
......@@ -337,8 +404,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
}
}
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg);
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
// judge from msg to write to unique queue
dndWriteSnodeMsgToRandomWorker(pDnode, pMsg);
}
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
}
int32_t dndInitSnode(SDnode *pDnode) {
......
......@@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen)
}
return 0;
}
\ No newline at end of file
}
......@@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
void sndDestroy(const char *path) {}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deployment
// stream stop/resume
// operator exec
return 0;
}
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec
return 0;
}
......@@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
......
......@@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu
int32_t tWWorkerInit(SWWorkerPool *pool) {
pool->nextId = 0;
pool->workers = calloc(sizeof(SWWorker), pool->max);
pool->workers = calloc(pool->max, sizeof(SWWorker));
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册