提交 dde03891 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/qnode

...@@ -150,7 +150,7 @@ struct SQueryNode; ...@@ -150,7 +150,7 @@ struct SQueryNode;
* @param requestId * @param requestId
* @return * @return
*/ */
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId); int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule
......
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
extern "C" { extern "C" {
#endif #endif
#include "planner.h"
#include "catalog.h" #include "catalog.h"
#include "planner.h"
struct SSchJob;
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
uint32_t maxJobNum; uint32_t maxJobNum;
...@@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); ...@@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes); int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
...@@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void ...@@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob); int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
/** /**
* Fetch query result from the remote query executor * Fetch query result from the remote query executor
...@@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, ...@@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
* @param data * @param data
* @return * @return
*/ */
int32_t scheduleFetchRows(void *pJob, void **data); int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
/** /**
......
...@@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody { ...@@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
void* fp; void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
SDataBuf requestMsg; SDataBuf requestMsg;
SReqResultInfo resInfo; SReqResultInfo resInfo;
} SRequestSendRecvBody; } SRequestSendRecvBody;
...@@ -129,7 +130,7 @@ typedef struct SRequestObj { ...@@ -129,7 +130,7 @@ typedef struct SRequestObj {
char *msgBuf; char *msgBuf;
void *pInfo; // sql parse info, generated by parser module void *pInfo; // sql parse info, generated by parser module
int32_t code; int32_t code;
uint64_t affectedRows; uint64_t affectedRows; // todo remove it
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
} SRequestObj; } SRequestObj;
...@@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str); ...@@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp(); void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void *doFetchRow(SRequestObj* pRequest); void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
......
#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdef.h" #include "tdef.h"
#include "tep.h" #include "tep.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -8,9 +12,6 @@ ...@@ -8,9 +12,6 @@
#include "tnote.h" #include "tnote.h"
#include "tpagedfile.h" #include "tpagedfile.h"
#include "tref.h" #include "tref.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#define CHECK_CODE_GOTO(expr, lable) \ #define CHECK_CODE_GOTO(expr, lable) \
do { \ do { \
...@@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
} }
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
...@@ -198,36 +200,48 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -198,36 +200,48 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
pRequest->type = pQueryNode->type; pRequest->type = pQueryNode->type;
SSchema *pSchema = NULL;
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &pResInfo->numOfCols, pRequest->requestId);
if (code != 0) { if (code != 0) {
return code; return code;
} }
if (pQueryNode->type == TSDB_SQL_SELECT) { if (pQueryNode->type == TSDB_SQL_SELECT) {
pResInfo->fields = calloc(1, sizeof(TAOS_FIELD)); SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes; SSubplan* pPlan = taosArrayGetP(pa, 0);
pResInfo->fields[i].type = pSchema[i].type; SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); setResSchemaInfo(pResInfo, pDataBlockSchema);
} pRequest->type = TDMT_VND_QUERY;
} }
return code; return code;
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);
pResInfo->numOfCols = pDataBlockSchema->numOfCols;
pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
SSchema* pSchema = &pDataBlockSchema->pSchema[i];
pResInfo->fields[i].bytes = pSchema->bytes;
pResInfo->fields[i].type = pSchema->type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
}
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res); int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
} else { } else {
if (*pJob != NULL) { if (pRequest->body.pQueryJob != NULL) {
scheduleFreeJob(*pJob); scheduleFreeJob(pRequest->body.pQueryJob);
} }
} }
...@@ -235,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { ...@@ -235,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
return res.code; return res.code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
} }
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
...@@ -312,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -312,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
SQueryNode *pQuery = NULL; SQueryNode *pQuery = NULL;
SQueryDag *pDag = NULL; SQueryDag *pDag = NULL;
void *pJob = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
...@@ -322,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -322,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else { } else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
pRequest->code = terrno; pRequest->code = terrno;
return pRequest;
} }
_return: _return:
...@@ -333,6 +345,7 @@ _return: ...@@ -333,6 +345,7 @@ _return:
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno; pRequest->code = terrno;
} }
return pRequest; return pRequest;
} }
...@@ -531,7 +544,10 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -531,7 +544,10 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_MND_SHOW) { if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH;
scheduleFetchRows(pRequest->body.pQueryJob, &pRequest->body.resInfo.pData);
} else if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE; pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
......
...@@ -568,7 +568,7 @@ TEST(testCase, projection_query_tables) { ...@@ -568,7 +568,7 @@ TEST(testCase, projection_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)"); // pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)");
// taos_free_result(pRes); taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "create table tu using m1 tags(1)"); // pRes = taos_query(pConn, "create table tu using m1 tags(1)");
// taos_free_result(pRes); // taos_free_result(pRes);
......
...@@ -89,7 +89,7 @@ static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) { ...@@ -89,7 +89,7 @@ static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
dataBlockSchema->numOfCols = pPlanNode->numOfExpr; dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols); dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
if (NULL == dataBlockSchema->pSchema) { if (NULL == dataBlockSchema->pSchema) {
return false; return false;
} }
...@@ -306,8 +306,6 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -306,8 +306,6 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
case QNODE_PROJECT:
// node = create
case QNODE_MODIFY: case QNODE_MODIFY:
// Insert is not an operator in a physical plan. // Insert is not an operator in a physical plan.
break; break;
......
...@@ -56,7 +56,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { ...@@ -56,7 +56,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree(pDag); tfree(pDag);
} }
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId) { int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
SQueryPlanNode* pLogicPlan; SQueryPlanNode* pLogicPlan;
int32_t code = createQueryPlan(pNode, &pLogicPlan); int32_t code = createQueryPlan(pNode, &pLogicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -70,15 +70,6 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -70,15 +70,6 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
printf("%s\n", str); printf("%s\n", str);
} }
int32_t numOfOutput = pLogicPlan->numOfExpr;
*pSchema = calloc(numOfOutput, sizeof(SSchema));
*numOfResCols = numOfOutput;
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pLogicPlan->pExpr, i);
memcpy(&(*pSchema)[i], pExprInfo->pExpr->pSchema, sizeof(SSchema));
}
code = optimizeQueryPlan(pLogicPlan); code = optimizeQueryPlan(pLogicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(pLogicPlan); destroyQueryPlan(pLogicPlan);
......
...@@ -65,7 +65,7 @@ protected: ...@@ -65,7 +65,7 @@ protected:
SSchema *schema = NULL; SSchema *schema = NULL;
uint32_t numOfOutput = 0; uint32_t numOfOutput = 0;
code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, requestId); code = qCreateQueryDag(query, &dag, requestId);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }
......
...@@ -58,7 +58,6 @@ typedef struct SSchLevel { ...@@ -58,7 +58,6 @@ typedef struct SSchLevel {
SArray *subTasks; // Element is SQueryTask SArray *subTasks; // Element is SQueryTask
} SSchLevel; } SSchLevel;
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SRWLatch lock; // task lock SRWLatch lock; // task lock
......
...@@ -773,8 +773,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -773,8 +773,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break; break;
} }
break;
}
case TDMT_VND_RES_READY_RSP: { case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg; SResReadyRsp *rsp = (SResReadyRsp *)msg;
...@@ -1198,7 +1196,7 @@ void schDropJobAllTasks(SSchJob *pJob) { ...@@ -1198,7 +1196,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks); schDropTaskInHashList(pJob, pJob->failTasks);
} }
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) { if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
} }
...@@ -1311,7 +1309,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -1311,7 +1309,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -1328,7 +1326,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void ...@@ -1328,7 +1326,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -1343,12 +1341,10 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, ...@@ -1343,12 +1341,10 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
} }
int32_t scheduleFetchRows(void *job, void **data) { int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == job || NULL == data) { if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSchJob *pJob = job;
int32_t code = 0; int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob); int8_t status = SCH_GET_JOB_STATUS(pJob);
...@@ -1489,4 +1485,4 @@ void scheduleFreeJob(void *job) { ...@@ -1489,4 +1485,4 @@ void scheduleFreeJob(void *job) {
void schedulerDestroy(void) { void schedulerDestroy(void) {
if (schMgmt.jobs) { if (schMgmt.jobs) {
taosHashCleanup(sch taosHashCleanup(sch
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册