diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5c2c89b2450fc4bd98e51c7fe4165bbbeb94375..5a60761f112f2230de11a0f7e3d802de84163217 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1118,6 +1118,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int64_t streamId; + char* sql; + char* executorMsg; +} SMVCreateStreamReq, SMSCreateStreamReq; + +typedef struct { + int64_t streamId; +} SMVCreateStreamRsp, SMSCreateStreamRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index c9fab140cce647f22827648404b93d507a70e4c1..21a93532e0d77b3c8f5e6157380e7804b393d622 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); + /** * @brief Drop a snode. * diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 13ef101908906d309a88d2349382fdedb765cf30..aeea5386b4ea39d42cac8d599e53d595f71c5660 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -90,9 +90,11 @@ typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; + int8_t uniqueWorkerInUse; SSnode *pSnode; SRWLatch latch; - SDnodeWorker writeWorker; + SArray *uniqueWorkers; // SArray + SDnodeWorker sharedWorker; } SSnodeMgmt; typedef struct { @@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo); } #endif -#endif /*_TD_DND_ENV_H_*/ \ No newline at end of file +#endif /*_TD_DND_ENV_H_*/ diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4ca6b97ad44d3e1fb21fe76cfdfa198def67552c..a8530037daabecd6c2fad43f8ad7f680315bd6e4 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); } #endif -#endif /*_TD_DND_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_INT_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 4906aef246ec08d8b4c6c2ee7511aaed7fdf000f..d192d9df015a80f73995bf9a54eef5a9caa02c75 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -19,7 +19,20 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); +typedef struct { + int32_t vgId; + int32_t refCount; + int32_t snVersion; + int8_t dropped; + char *path; + SSnode *pImpl; + STaosQueue *pSharedQ; + STaosQueue *pUniqueQ; +} SSnodeObj; + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SSnode *dndAcquireSnode(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; @@ -152,8 +165,18 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { - dError("failed to start snode write worker since %s", terrstr()); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + for (int32_t i = 0; i < 2; i++) { + SDnodeWorker uniqueWorker; + if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + dError("failed to start snode unique worker since %s", terrstr()); + return -1; + } + taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker); + } + if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, + dndProcessSnodeSharedQueue)) { + dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -169,9 +192,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 0) { taosMsleep(10); - } + } - dndCleanupWorker(&pMgmt->writeWorker); + for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { + SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); + dndCleanupWorker(worker); + } + taosArrayDestroy(pMgmt->uniqueWorkers); } static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { @@ -292,17 +319,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - SRpcMsg *pRsp = NULL; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - code = sndProcessMsg(pSnode, pMsg, &pRsp); + for (int32_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + + sndProcessUMsg(pSnode, pMsg); + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } } dndReleaseSnode(pDnode, pSnode); +} +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessSMsg(pSnode, pMsg); + } + dndReleaseSnode(pDnode, pSnode); + +#if 0 if (pMsg->msgType & 1u) { if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; @@ -314,11 +360,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcRsp); } } +#endif rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } +static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); + SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; @@ -337,8 +404,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + // judge from msg to write to unique queue + dndWriteSnodeMsgToRandomWorker(pDnode, pMsg); +} + +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); } int32_t dndInitSnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 5ccf6640c04789911932a1aabd2717f7efd3762a..38f8737b2bbbc0796a98688498eeab7d44cdef76 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) } return 0; -} \ No newline at end of file +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 01500fbc54a19a01559005c313fbcdb40970ebcb..91008dd03ac8847b8a29d84b5324ece3cbb9c5c1 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } void sndDestroy(const char *path) {} + +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // stream deployment + // stream stop/resume + // operator exec + return 0; +} + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // operator exec + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 92a111298fa093699dd75cbadec5846d11fefa44..a2342ec85aa066fab1bcbd2f7e93953151711508 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + // currently only rows are used + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 1657a85ee8c85ffc70e088ba307cdb37d4873417..1fa70da870ffd9fe33fb24cfff9b6f3f48115154 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SWWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SWWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1;