未验证 提交 b5a82575 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9970 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -696,13 +696,13 @@ typedef struct SVgroupInfo { ...@@ -696,13 +696,13 @@ typedef struct SVgroupInfo {
uint32_t hashEnd; uint32_t hashEnd;
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA]; SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo; } SVgroupInfo;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA]; SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg; } SVgroupMsg;
typedef struct { typedef struct {
......
...@@ -178,7 +178,7 @@ struct SQueryNode; ...@@ -178,7 +178,7 @@ struct SQueryNode;
* @param requestId * @param requestId
* @return * @return
*/ */
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule
......
...@@ -136,6 +136,7 @@ typedef struct SReqResultInfo { ...@@ -136,6 +136,7 @@ typedef struct SReqResultInfo {
TAOS_ROW row; TAOS_ROW row;
char **pCol; char **pCol;
uint32_t numOfRows; uint32_t numOfRows;
uint64_t totalRows;
uint32_t current; uint32_t current;
bool completed; bool completed;
} SReqResultInfo; } SReqResultInfo;
......
...@@ -25,9 +25,9 @@ ...@@ -25,9 +25,9 @@
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
return false; return false;
} }
...@@ -59,7 +59,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -59,7 +59,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
} }
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema); static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
...@@ -202,43 +202,38 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -202,43 +202,38 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
pRequest->type = pQueryNode->type; pRequest->type = pQueryNode->type;
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SSchema* pSchema = NULL;
int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId); int32_t numOfCols = 0;
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
if (code != 0) { if (code != 0) {
return code; return code;
} }
if (pQueryNode->type == TSDB_SQL_SELECT) { if (pQueryNode->type == TSDB_SQL_SELECT) {
SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0); setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
SSubplan* pPlan = taosArrayGetP(pa, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
setResSchemaInfo(pResInfo, pDataBlockSchema);
pRequest->type = TDMT_VND_QUERY; pRequest->type = TDMT_VND_QUERY;
} }
return code; return code;
} }
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) { void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0); assert(pSchema != NULL && numOfCols > 0);
pResInfo->numOfCols = pDataBlockSchema->numOfCols; pResInfo->numOfCols = numOfCols;
pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0])); pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
SSchema* pSchema = &pDataBlockSchema->pSchema[i]; pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].bytes = pSchema->bytes; pResInfo->fields[i].type = pSchema[i].type;
pResInfo->fields[i].type = pSchema->type; tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
} }
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
...@@ -256,14 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { ...@@ -256,14 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return pRequest->code; return pRequest->code;
} }
SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2};
addr.epAddr[0].port = 7100;
strcpy(addr.epAddr[0].fqdn, "localhost");
taosArrayPush(execNode, &addr);
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
} }
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
...@@ -399,7 +387,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -399,7 +387,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
// todo check for invalid sql statement and return with error code // todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); SSchema *schema = NULL;
int32_t numOfCols = 0;
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return);
pStr = qDagToString(pRequest->body.pDag); pStr = qDagToString(pRequest->body.pDag);
if(pStr == NULL) { if(pStr == NULL) {
...@@ -492,6 +482,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -492,6 +482,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
SQueryNode *pQueryNode = NULL; SQueryNode *pQueryNode = NULL;
SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
...@@ -500,8 +491,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -500,8 +491,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if (qIsDdlQuery(pQueryNode)) { if (qIsDdlQuery(pQueryNode)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
} else { } else {
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
pRequest->code = terrno; pRequest->code = terrno;
} }
...@@ -719,13 +710,17 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -719,13 +710,17 @@ void* doFetchRow(SRequestObj* pRequest) {
return NULL; return NULL;
} }
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
return NULL; return NULL;
} }
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
if (pResultInfo->numOfRows == 0) { if (pResultInfo->numOfRows == 0) {
return NULL; return NULL;
} }
...@@ -855,7 +850,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { ...@@ -855,7 +850,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock(&pTscObj->mutex); pthread_mutex_unlock(&pTscObj->mutex);
} }
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*) pRsp; pResultInfo->pRspMsg = (const char*) pRsp;
...@@ -864,5 +859,6 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p ...@@ -864,5 +859,6 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->totalRows += pResultInfo->numOfRows;
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
} }
...@@ -5408,7 +5408,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt ...@@ -5408,7 +5408,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator; return pOperator;
} }
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5428,6 +5428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp ...@@ -5428,6 +5428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
tqReadHandleSetTbUid(streamReadHandle, uid);
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
...@@ -7719,7 +7720,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask ...@@ -7719,7 +7720,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) { } else if (pPhyNode->info.type == OP_StreamScan) {
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo); SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo);
} }
} }
......
...@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); ...@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
*/ */
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId);
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan); int32_t stringToSubplan(const char* str, SSubplan** subplan);
......
...@@ -261,14 +261,14 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { ...@@ -261,14 +261,14 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
return; return;
} }
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList; SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
subplan->msgType = TDMT_VND_QUERY; subplan->msgType = TDMT_VND_QUERY;
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); vgroupMsgToEpSet(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
RECOVERY_CURRENT_SUBPLAN(pCxt); RECOVERY_CURRENT_SUBPLAN(pCxt);
} }
...@@ -384,18 +384,19 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { ...@@ -384,18 +384,19 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
subplan->msgType = TDMT_VND_QUERY; subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot); subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode); subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
} }
// todo deal subquery // todo deal subquery
} }
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) { int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
.pCatalog = pCatalog, .pCatalog = pCatalog,
.pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {.queryId = requestId}, //The unsigned Id starting from 1 would be better
.nextId = {.queryId = requestId, .subplanId = 1, .templateId = 1},
}; };
*pDag = context.pDag; *pDag = context.pDag;
...@@ -408,6 +409,17 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -408,6 +409,17 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
terrno = code; terrno = code;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} END_TRY } END_TRY
// traverse the dag again to acquire the execution node.
if (pNodeList != NULL) {
SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans);
size_t num = taosArrayGetSize(*pSubLevel);
for (int32_t j = 0; j < num; ++j) {
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
taosArrayPush(pNodeList, &pPlan->execNode);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "parser.h" #include "parser.h"
#include "plannerInt.h" #include "plannerInt.h"
static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols);
static void destroyDataSinkNode(SDataSink* pSinkNode) { static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) { if (pSinkNode == NULL) {
return; return;
...@@ -56,7 +58,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { ...@@ -56,7 +58,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree(pDag); tfree(pDag);
} }
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) { int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId) {
SQueryPlanNode* pLogicPlan; SQueryPlanNode* pLogicPlan;
int32_t code = createQueryPlan(pNode, &pLogicPlan); int32_t code = createQueryPlan(pNode, &pLogicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -76,17 +78,31 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -76,17 +78,31 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return code; return code;
} }
code = createDag(pLogicPlan, NULL, pDag, requestId); code = createDag(pLogicPlan, NULL, pDag, pNodeList, requestId);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(pLogicPlan); destroyQueryPlan(pLogicPlan);
qDestroyQueryDag(*pDag); qDestroyQueryDag(*pDag);
return code; return code;
} }
extractResSchema(pDag, pResSchema, numOfCols);
destroyQueryPlan(pLogicPlan); destroyQueryPlan(pLogicPlan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema,
int32_t* numOfCols) { // extract the final result schema
SArray* pTopSubplan = taosArrayGetP((*pDag)->pSubplans, 0);
SSubplan* pPlan = taosArrayGetP(pTopSubplan, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
*numOfCols = pDataBlockSchema->numOfCols;
*pResSchema = calloc(pDataBlockSchema->numOfCols, sizeof(SSchema));
memcpy((*pResSchema), pDataBlockSchema->pSchema, pDataBlockSchema->numOfCols * sizeof(SSchema));
}
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
setSubplanExecutionNode(subplan, templateId, pSource); setSubplanExecutionNode(subplan, templateId, pSource);
} }
......
...@@ -61,7 +61,7 @@ protected: ...@@ -61,7 +61,7 @@ protected:
int32_t run() { int32_t run() {
SQueryDag* dag = nullptr; SQueryDag* dag = nullptr;
uint64_t requestId = 20; uint64_t requestId = 20;
int32_t code = createDag(logicPlan_.get(), nullptr, &dag, requestId); int32_t code = createDag(logicPlan_.get(), nullptr, &dag, NULL, requestId);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }
...@@ -78,9 +78,9 @@ protected: ...@@ -78,9 +78,9 @@ protected:
SQueryDag* dag = nullptr; SQueryDag* dag = nullptr;
uint64_t requestId = 20; uint64_t requestId = 20;
SSchema *schema = NULL; SSchema *schema = NULL;
uint32_t numOfOutput = 0; int32_t numOfOutput = 0;
code = qCreateQueryDag(query, &dag, requestId); code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, nullptr, requestId);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }
......
...@@ -171,17 +171,17 @@ typedef struct SQWorkerMgmt { ...@@ -171,17 +171,17 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
......
...@@ -146,15 +146,15 @@ typedef struct SSchJob { ...@@ -146,15 +146,15 @@ typedef struct SSchJob {
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) #define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) #define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \ #define SCH_TASK_ELOG(param, ...) \
qError("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qError("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \ #define SCH_TASK_DLOG(param, ...) \
qDebug("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qDebug("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \ #define SCH_TASK_WLOG(param, ...) \
qWarn("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qWarn("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
...@@ -650,15 +650,11 @@ _return: ...@@ -650,15 +650,11 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schProcessOnDataFetched(SSchJob *job) { int32_t schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
tsem_post(&job->rspSem); tsem_post(&job->rspSem);
} }
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
bool needRetry = false; bool needRetry = false;
...@@ -882,7 +878,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -882,7 +878,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
} }
SCH_ERR_JRET(schProcessOnDataFetched(pJob)); SCH_ERR_JRET(schProcessOnDataFetched(pJob));
break; break;
} }
case TDMT_VND_DROP_TASK: { case TDMT_VND_DROP_TASK: {
...@@ -892,7 +887,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -892,7 +887,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
} }
default: default:
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -935,8 +929,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -935,8 +929,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
} }
pTask = *task; pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode);
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
...@@ -1562,8 +1555,8 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { ...@@ -1562,8 +1555,8 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) { if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0;
int32_t code = 0;
atomic_add_fetch_32(&pJob->ref, 1); atomic_add_fetch_32(&pJob->ref, 1);
int8_t status = SCH_GET_JOB_STATUS(pJob); int8_t status = SCH_GET_JOB_STATUS(pJob);
...@@ -1609,7 +1602,6 @@ _return: ...@@ -1609,7 +1602,6 @@ _return:
while (true) { while (true) {
*pData = atomic_load_ptr(&pJob->res); *pData = atomic_load_ptr(&pJob->res);
if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) {
continue; continue;
} }
...@@ -1628,8 +1620,7 @@ _return: ...@@ -1628,8 +1620,7 @@ _return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, code:%x", code); SCH_JOB_DLOG("fetch done, code:%s", tstrerror(code));
atomic_sub_fetch_32(&pJob->ref, 1); atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code); SCH_RET(code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册