提交 e1d9fa73 编写于 作者: H Haojun Liao

[td-11818] Support create topic.

上级 93805a6e
......@@ -29,23 +29,23 @@ struct SSubplan;
/**
* Create the exec task for streaming mode
* @param pMsg
* @param pStreamBlockReadHandle
* @param streamReadHandle
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle);
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input);
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
/**
* Create the exec task object according to task json
* @param tsdb
* @param readHandle
* @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo
* @param qId
* @return
*/
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/**
* The main task execution function, including query on both table and multiple tables,
......@@ -62,63 +62,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param qinfo
* @param tinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext);
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
/**
*
* Retrieve the actual results to fill the response message payload.
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
*
* @param qinfo qinfo object
* @param tinfo tinfo object
* @param pRsp response message
* @param contLen payload length
* @return
*/
//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
* @param qinfo
* @param tinfo
* @return
*/
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
void* qGetResultRetrieveMsg(qTaskInfo_t tinfo);
/**
* kill the ongoing query and free the query handle and corresponding resources automatically
* @param qinfo qhandle
* @param tinfo qhandle
* @return
*/
int32_t qKillTask(qTaskInfo_t qinfo);
int32_t qKillTask(qTaskInfo_t tinfo);
/**
* kill the ongoing query asynchronously
* @param qinfo qhandle
* @param tinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
/**
* return whether query is completed or not
* @param qinfo
* @param tinfo
* @return
*/
int32_t qIsTaskCompleted(qTaskInfo_t qinfo);
int32_t qIsTaskCompleted(qTaskInfo_t tinfo);
/**
* destroy query info structure
* @param qHandle
*/
void qDestroyTask(qTaskInfo_t qHandle);
void qDestroyTask(qTaskInfo_t tinfo);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t qGetQueriedTableUid(qTaskInfo_t qHandle);
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
......@@ -145,7 +145,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param type operation type: ADD|DROP
* @return
*/
int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type);
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
......
......@@ -14,12 +14,50 @@
*/
#include "executor.h"
#include "tq.h"
#include "executorimpl.h"
#include "planner.h"
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != OP_StreamScan) {
if (pOperator->numOfDownstream > 0) {
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
if (pOperator->numOfDownstream > 1) { // not handle this in join query
return TSDB_CODE_QRY_APP_ERROR;
}
return doSetStreamBlock(pOperator->pDownstream[0], input);
}
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
return TSDB_CODE_SUCCESS;
}
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
if (input == NULL) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
} else {
qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
}
return code;
}
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) {
if (pMsg == NULL || streamReadHandle == NULL) {
return NULL;
}
......@@ -37,7 +75,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockRead
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......
......@@ -5407,7 +5407,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator;
}
SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5417,10 +5417,21 @@ SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32
return NULL;
}
pInfo->readerHandle = pStreamBlockHandle;
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo);
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
}
// TODO set the extract column id to streamHandle
// pColList
pInfo->readerHandle = streamReadHandle;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = OP_StreamBlockScan;
pOperator->operatorType = OP_StreamScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7704,6 +7715,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
} else if (pPhyNode->info.type == OP_Exchange) {
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) {
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册