未验证 提交 61ed86e3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9956 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
......@@ -29,21 +29,23 @@ struct SSubplan;
/**
* Create the exec task for streaming mode
* @param pMsg
* @param pStreamBlockReadHandle
* @param streamReadHandle
* @return
*/
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle);
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
/**
* Create the exec task object according to task json
* @param tsdb
* @param readHandle
* @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo
* @param qId
* @return
*/
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/**
* The main task execution function, including query on both table and multiple tables,
......@@ -60,63 +62,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param qinfo
* @param tinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext);
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
/**
*
* Retrieve the actual results to fill the response message payload.
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
*
* @param qinfo qinfo object
* @param tinfo tinfo object
* @param pRsp response message
* @param contLen payload length
* @return
*/
//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
* @param qinfo
* @param tinfo
* @return
*/
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
void* qGetResultRetrieveMsg(qTaskInfo_t tinfo);
/**
* kill the ongoing query and free the query handle and corresponding resources automatically
* @param qinfo qhandle
* @param tinfo qhandle
* @return
*/
int32_t qKillTask(qTaskInfo_t qinfo);
int32_t qKillTask(qTaskInfo_t tinfo);
/**
* kill the ongoing query asynchronously
* @param qinfo qhandle
* @param tinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
/**
* return whether query is completed or not
* @param qinfo
* @param tinfo
* @return
*/
int32_t qIsTaskCompleted(qTaskInfo_t qinfo);
int32_t qIsTaskCompleted(qTaskInfo_t tinfo);
/**
* destroy query info structure
* @param qHandle
*/
void qDestroyTask(qTaskInfo_t qHandle);
void qDestroyTask(qTaskInfo_t tinfo);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t qGetQueriedTableUid(qTaskInfo_t qHandle);
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
......@@ -143,7 +145,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param type operation type: ADD|DROP
* @return
*/
int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type);
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
......
......@@ -196,6 +196,10 @@ static void doDestroyRequest(void* p) {
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryDag(pRequest->body.pDag);
if (pRequest->body.showInfo.pArray != NULL) {
taosArrayDestroy(pRequest->body.showInfo.pArray);
}
deregisterRequest(pRequest);
tfree(pRequest);
}
......
......@@ -145,19 +145,23 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
pSchema = pMetaMsg->pSchema;
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
pFields[i].type = pSchema[i].type;
pFields[i].bytes = pSchema[i].bytes;
}
tfree(pRequest->body.resInfo.pRspMsg);
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pResInfo->fields = pFields;
pResInfo->numOfCols = pMetaMsg->numOfColumns;
if (pResInfo->fields == NULL) {
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
pFields[i].type = pSchema[i].type;
pFields[i].bytes = pSchema[i].bytes;
}
pResInfo->fields = pFields;
}
pResInfo->numOfCols = pMetaMsg->numOfColumns;
pRequest->body.showInfo.execId = pShow->showId;
// todo
......
......@@ -451,39 +451,39 @@ TEST(testCase, driverInit_Test) {
//
// taos_close(pConn);
//}
//
//TEST(testCase, show_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show tables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// }
//
// pRes = taos_query(pConn, "show abc1.tables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// int32_t count = 0;
// char str[512] = {0};
//
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%d: %s\n", ++count, str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
TEST(testCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
}
pRes = taos_query(pConn, "show abc1.tables");
if (taos_errno(pRes) != 0) {
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
int32_t count = 0;
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%d: %s\n", ++count, str);
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......@@ -525,29 +525,29 @@ TEST(testCase, driverInit_Test) {
// taosHashCleanup(phash);
//}
//
TEST(testCase, create_topic_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == nullptr);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// char* sql = "select * from tu";
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_free_result(pRes);
// taos_close(pConn);
//}
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......
......@@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
......
......@@ -82,27 +82,12 @@ typedef struct STqSubscribeReq {
int64_t topic[];
} STqSubscribeReq;
typedef struct STqSubscribeRsp {
STqMsgHead head;
int64_t vgId;
char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
} STqSubscribeRsp;
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
typedef struct STqTopicVhandle {
int64_t topicId;
// executor for filter
void* filterExec;
// callback for mnode
// trigger when vnode list associated topic change
void* (*mCallback)(void*, void*);
} STqTopicVhandle;
#define TQ_BUFFER_SIZE 8
typedef struct STqExec {
......
......@@ -633,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
// read until find TDMT_VND_SUBMIT
}
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pHandle->buffer.output[pos].task;
/*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/
qSetStreamInput(task, pCont);
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
}
// TODO: launch query and get output data
void* outputData;
pHandle->buffer.output[pos].dst = outputData;
pHandle->buffer.output[pos].dst = pDataBlock;
if (pHandle->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset;
......@@ -674,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pTopic->buffer.output[i].status = 0;
pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL);
}
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta
......
......@@ -84,7 +84,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
}
vTrace("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name);
vInfo("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name);
free(pCreateTbReq->name);
if (pCreateTbReq->type == TD_SUPER_TABLE) {
free(pCreateTbReq->stbCfg.pSchema);
......
......@@ -13,11 +13,55 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planner.h"
#include "executor.h"
#include "tq.h"
#include "executorimpl.h"
#include "planner.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != OP_StreamScan) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_QRY_APP_ERROR;
}
if (pOperator->numOfDownstream > 1) { // not handle this in join query
qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_QRY_APP_ERROR;
}
return doSetStreamBlock(pOperator->pDownstream[0], input, reqId);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
return TSDB_CODE_SUCCESS;
}
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
if (input == NULL) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
} else {
qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
}
return code;
}
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) {
if (pMsg == NULL || streamReadHandle == NULL) {
return NULL;
}
......@@ -27,15 +71,15 @@ qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadH
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......
......@@ -5407,7 +5407,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator;
}
SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5417,10 +5417,21 @@ SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32
return NULL;
}
pInfo->readerHandle = pStreamBlockHandle;
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo);
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
}
// TODO set the extract column id to streamHandle
// pColList
pInfo->readerHandle = streamReadHandle;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = OP_StreamBlockScan;
pOperator->operatorType = OP_StreamScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
......@@ -7704,6 +7715,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
} else if (pPhyNode->info.type == OP_Exchange) {
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) {
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo);
}
}
......
......@@ -62,9 +62,8 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
pEpSet->port[i] = info->epAddr[i].port;
}
*outputLen = sizeof(SVShowTablesReq);
*output = pShowReq;
*outputLen = sizeof(SVShowTablesReq);
*output = pShowReq;
*pExtension = array;
} else {
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
......
......@@ -36,25 +36,29 @@ bool qIsDdlQuery(const SQueryNode* pQueryNode) {
}
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlInfo info = doGenerateAST(pCxt->pSql);
if (!info.valid) {
strncpy(pCxt->pMsg, info.msg, pCxt->msgLen);
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return terrno;
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
goto _end;
}
if (!isDqlSqlStatement(&info)) {
if (info.type == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
if (pModifStmtInfo == NULL) {
return terrno;
code = terrno;
goto _end;
}
*pQuery = (SQueryNode*)pModifStmtInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
return terrno;
code = terrno;
goto _end;
}
*pQuery = (SQueryNode*)pDcl;
......@@ -63,21 +67,22 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
} else {
SQueryStmtInfo* pQueryInfo = createQueryInfo();
if (pQueryInfo == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
return terrno;
code = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
goto _end;
}
int32_t code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
*pQuery = (SQueryNode*)pQueryInfo;
} else {
terrno = code;
return code;
goto _end;
}
}
_end:
destroySqlInfo(&info);
return TSDB_CODE_SUCCESS;
terrno = code;
return code;
}
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) {
......@@ -247,5 +252,6 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks);
}
tfree(pQueryNode);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册