提交 93805a6e 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/tq' into feature/3.0_liaohj

...@@ -26,6 +26,15 @@ typedef void* qTaskInfo_t; ...@@ -26,6 +26,15 @@ typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
struct SSubplan; struct SSubplan;
/**
* Create the exec task for streaming mode
* @param pMsg
* @param pStreamBlockReadHandle
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input);
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
...@@ -203,4 +212,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo); ...@@ -203,4 +212,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
} }
#endif #endif
#endif /*_TD_EXECUTOR_H_*/ #endif /*_TD_EXECUTOR_H_*/
\ No newline at end of file
...@@ -328,7 +328,7 @@ typedef struct SMqTopicConsumer { ...@@ -328,7 +328,7 @@ typedef struct SMqTopicConsumer {
typedef struct SMqConsumerEp { typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned int32_t vgId; // -1 for unassigned
SEpSet epset; SEpSet epSet;
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
...@@ -339,7 +339,7 @@ typedef struct SMqConsumerEp { ...@@ -339,7 +339,7 @@ typedef struct SMqConsumerEp {
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
return tlen; return tlen;
...@@ -347,7 +347,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon ...@@ -347,7 +347,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
......
...@@ -117,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -117,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// persist msg // persist msg
STransAction action = {0}; STransAction action = {0};
action.epSet = pCEp->epset; action.epSet = pCEp->epSet;
action.pCont = reqStr; action.pCont = reqStr;
action.contLen = tlen; action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN; action.msgType = TDMT_VND_MQ_SET_CONN;
...@@ -142,36 +142,25 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -142,36 +142,25 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
} }
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
//convert phyplan to dag //convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray; SArray *pArray;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
return -1;
} }
int32_t sz = taosArrayGetSize(pArray); int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg //convert dag to msg
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i); STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
int32_t vgId = pTaskInfo->addr.nodeId; tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
SEpSet epSet; CEp.vgId = pTaskInfo->addr.nodeId;
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr); taosArrayPush(unassignedVg, &CEp);
} }
/*pTopic->physicalPlan;*/
SVgObj *pVgroup = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
while (pIter != NULL) {
if (pVgroup->dbUid == pTopic->dbUid) {
CEp.epset = mndGetVgroupEpset(pMnode, pVgroup);
CEp.vgId = pVgroup->vgId;
taosArrayPush(unassignedVg, &CEp);
}
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
}
return 0;
qDestroyQueryDag(pDag); qDestroyQueryDag(pDag);
return 0;
} }
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
......
...@@ -26,6 +26,7 @@ target_link_libraries( ...@@ -26,6 +26,7 @@ target_link_libraries(
PUBLIC tfs PUBLIC tfs
PUBLIC wal PUBLIC wal
PUBLIC scheduler PUBLIC scheduler
PUBLIC executor
PUBLIC qworker PUBLIC qworker
) )
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "meta.h" #include "meta.h"
#include "os.h" #include "os.h"
#include "scheduler.h" #include "scheduler.h"
#include "executor.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlist.h" #include "tlist.h"
#include "tmsg.h" #include "tmsg.h"
...@@ -81,27 +82,12 @@ typedef struct STqSubscribeReq { ...@@ -81,27 +82,12 @@ typedef struct STqSubscribeReq {
int64_t topic[]; int64_t topic[];
} STqSubscribeReq; } STqSubscribeReq;
typedef struct STqSubscribeRsp {
STqMsgHead head;
int64_t vgId;
char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
} STqSubscribeRsp;
typedef struct STqHeartbeatReq { typedef struct STqHeartbeatReq {
} STqHeartbeatReq; } STqHeartbeatReq;
typedef struct STqHeartbeatRsp { typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp; } STqHeartbeatRsp;
typedef struct STqTopicVhandle {
int64_t topicId;
// executor for filter
void* filterExec;
// callback for mnode
// trigger when vnode list associated topic change
void* (*mCallback)(void*, void*);
} STqTopicVhandle;
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
typedef struct STqExec { typedef struct STqExec {
...@@ -165,7 +151,7 @@ typedef struct STqTaskItem { ...@@ -165,7 +151,7 @@ typedef struct STqTaskItem {
int8_t status; int8_t status;
int64_t offset; int64_t offset;
void* dst; void* dst;
SSubQueryMsg* pMsg; qTaskInfo_t task;
} STqTaskItem; } STqTaskItem;
// new version // new version
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../../../../../include/libs/executor/executor.h"
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
...@@ -634,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -634,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
// read until find TDMT_VND_SUBMIT // read until find TDMT_VND_SUBMIT
} }
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pHandle->buffer.output[pos].task;
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; qStreamExecTaskSetInput(task, pCont);
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
}
// TODO: launch query and get output data // TODO: launch query and get output data
void* outputData; pHandle->buffer.output[pos].dst = pDataBlock;
pHandle->buffer.output[pos].dst = outputData;
if (pHandle->buffer.firstOffset == -1 if (pHandle->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) { || pReq->offset < pHandle->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset; pHandle->buffer.firstOffset = pReq->offset;
...@@ -675,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { ...@@ -675,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL);
} }
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta // write mq meta
...@@ -763,32 +756,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -763,32 +756,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo); taosArrayPush(pArray, &colInfo);
return pArray; return pArray;
} }
static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
return NULL;
}
// print those info into log
pMsg->sId = be64toh(pMsg->sId);
pMsg->queryId = be64toh(pMsg->queryId);
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
return NULL;
}
return pTaskInfo;
}
\ No newline at end of file
...@@ -11,4 +11,38 @@ ...@@ -11,4 +11,38 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file
#include "executor.h"
#include "planner.h"
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {}
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
return NULL;
}
// print those info into log
pMsg->sId = be64toh(pMsg->sId);
pMsg->queryId = be64toh(pMsg->queryId);
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
return NULL;
}
return pTaskInfo;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册