未验证 提交 7280794c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9949 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc { ...@@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc {
bool interpQuery; bool interpQuery;
bool distinct; bool distinct;
bool join; bool join;
bool continueQuery;
} SMultiFunctionsDesc; } SMultiFunctionsDesc;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif #endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)
......
...@@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
pQueryStmtInfo->info.continueQuery = true;
// todo check for invalid sql statement and return with error code // todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
...@@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto _return; goto _return;
} }
printf("%s\n", pStr);
// The topic should be related to a database that the queried table is belonged to. // The topic should be related to a database that the queried table is belonged to.
SName name = {0}; SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0}; char dbName[TSDB_DB_FNAME_LEN] = {0};
......
...@@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) { ...@@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) {
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
// //
//TEST(testCase, create_topic_Test) { TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
//
// char* sql = "select * from tu"; char* sql = "select * from tu";
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
...@@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) { ...@@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, agg_query_tables) { //TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
//
TAOS_RES* pRes = taos_query(pConn, "use dbv"); // TAOS_RES* pRes = taos_query(pConn, "use dbv");
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); // pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes)); // printf("failed to create table, reason:%s\n", taos_errstr(pRes));
} // }
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "select count(*) from tu"); // pRes = taos_query(pConn, "select count(*) from tu");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); // taos_free_result(pRes);
ASSERT_TRUE(false); // ASSERT_TRUE(false);
} // }
//
TAOS_ROW pRow = NULL; // TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
//
char str[512] = {0}; // char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { // while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); // int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); // printf("%s\n", str);
} // }
//
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); // taos_close(pConn);
} //}
\ No newline at end of file \ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../../../../../include/libs/executor/executor.h"
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
...@@ -762,3 +763,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -762,3 +763,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo); taosArrayPush(pArray, &colInfo);
return pArray; return pArray;
} }
static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
return NULL;
}
// print those info into log
pMsg->sId = be64toh(pMsg->sId);
pMsg->queryId = be64toh(pMsg->queryId);
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
return NULL;
}
return pTaskInfo;
}
\ No newline at end of file
...@@ -253,12 +253,8 @@ typedef struct SExecTaskInfo { ...@@ -253,12 +253,8 @@ typedef struct SExecTaskInfo {
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char *sql; // query sql string char *sql; // query sql string
jmp_buf env; // jmp_buf env; //
DataSinkHandle dsHandle;
struct SOperatorInfo *pRoot; struct SOperatorInfo *pRoot;
} SExecTaskInfo; } SExecTaskInfo;
...@@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec(); ...@@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H
...@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) { ...@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(tsdb != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ ...@@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto _error; goto _error;
} }
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); code = dsCreateDataSinker(pSubplan->pDataSink, handle);
*handle = (*pTask)->dsHandle; _error:
_error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }
......
...@@ -7778,22 +7778,29 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, ...@@ -7778,22 +7778,29 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
return NULL; return NULL;
} }
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
tsdbReadHandleT tReaderHandle = NULL;
int32_t code = 0;
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;
SPhyNode* pPhyNode = pPlan->pNode; int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId); *pTaskInfo = createExecTaskInfo(queryId);
if (*pTaskInfo == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId);
if ((*pTaskInfo)->pRoot == NULL) { if ((*pTaskInfo)->pRoot == NULL) {
return terrno; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
} }
return TSDB_CODE_SUCCESS; return code;
_complete:
tfree(*pTaskInfo);
terrno = code;
return code;
} }
/** /**
......
...@@ -28,19 +28,20 @@ extern "C" { ...@@ -28,19 +28,20 @@ extern "C" {
#define QNODE_TAGSCAN 1 #define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2 #define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3 #define QNODE_STREAMSCAN 3
#define QNODE_AGGREGATE 4 #define QNODE_PROJECT 4
#define QNODE_GROUPBY 5 #define QNODE_AGGREGATE 5
#define QNODE_LIMIT 6 #define QNODE_GROUPBY 6
#define QNODE_JOIN 7 #define QNODE_LIMIT 7
#define QNODE_DISTINCT 8 #define QNODE_JOIN 8
#define QNODE_SORT 9 #define QNODE_DISTINCT 9
#define QNODE_UNION 10 #define QNODE_SORT 10
#define QNODE_TIMEWINDOW 11 #define QNODE_UNION 11
#define QNODE_SESSIONWINDOW 12 #define QNODE_TIMEWINDOW 12
#define QNODE_STATEWINDOW 13 #define QNODE_SESSIONWINDOW 13
#define QNODE_FILL 14 #define QNODE_STATEWINDOW 14
#define QNODE_MODIFY 15 #define QNODE_FILL 15
#define QNODE_MODIFY 16
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
......
...@@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla ...@@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch(type) { switch(type) {
case QNODE_TAGSCAN: case QNODE_TAGSCAN:
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
...@@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ ...@@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
return pNode; return pNode;
} }
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); SQueryPlanNode* pNode = NULL;
if (pQueryInfo->info.continueQuery) {
pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info);
} else {
pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
}
if (!pQueryInfo->info.projectionQuery) { if (!pQueryInfo->info.projectionQuery) {
SArray* p = pQueryInfo->exprList[0]; SArray* p = pQueryInfo->exprList[0];
...@@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* ...@@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
pNode->numOfExpr = num; pNode->numOfExpr = num;
pNode->pExpr = taosArrayInit(num, POINTER_BYTES); pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
taosArrayAddAll(pNode->pExpr, p); taosArrayAddAll(pNode->pExpr, p);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
} }
} }
...@@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, ...@@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
int32_t len = len1 + totalLen; int32_t len = len1 + totalLen;
switch(pQueryNode->info.type) { switch(pQueryNode->info.type) {
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
...@@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev ...@@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
assert(pQueryNode); assert(pQueryNode);
*str = calloc(1, 4096); *str = calloc(1, 4096);
int32_t len = sprintf(*str, "===== logic plan =====\n"); int32_t len = sprintf(*str, "===== logic plan =====\n");
......
...@@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { ...@@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type);
} }
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
...@@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TAGSCAN: case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode); node = createTagScanNode(pPlanNode);
break; break;
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
......
...@@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) { ...@@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan:
case OP_TableScan: case OP_TableScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:
......
...@@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x", code); QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
if ((pTaskInfo && NULL == sinkHandle) || (NULL == pTaskInfo && sinkHandle)) { if (NULL == sinkHandle || NULL == pTaskInfo) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
......
...@@ -666,12 +666,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) ...@@ -666,12 +666,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
int32_t taskDone = 0; int32_t taskDone = 0;
int32_t code = 0; int32_t code = 0;
SCH_TASK_DLOG("taskOnFailure, code:%x", errCode); SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) { if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
code = schMoveTaskToFailList(pJob, pTask, &moved); code = schMoveTaskToFailList(pJob, pTask, &moved);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册