提交 460036a0 编写于 作者: D dapan1121

feature/qnode

上级 2b5c4788
......@@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
//QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true;
//queryRsped = true;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
......@@ -994,10 +994,10 @@ _return:
input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) {
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
//if (!queryRsped) {
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//}
QW_RET(TSDB_CODE_SUCCESS);
}
......
......@@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) {
SFilterRangeNode* r = ctx->rs;
while (r) {
FILTER_COPY_RA(ra, &r->ra);
if (num) {
ra->e = r->ra.e;
ra->eflag = r->ra.eflag;
} else {
FILTER_COPY_RA(ra, &r->ra);
}
++num;
r = r->next;
++ra;
}
if (num == 0) {
......@@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
SFilterInfo *info = NULL;
int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) {
SFilterRange ra = {0};
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
......@@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
*win = TSWINDOW_INITIALIZER;
} else {
filterGetRangeNum(prev, &num);
if (num > 1) {
qError("only one time range accepted, num:%d", num);
FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
}
FLT_CHK_JMP(num < 1);
if (num > 1) {
*isStrict = false;
qDebug("more than one time range, num:%d", num);
}
SFilterRange tra;
filterGetRangeRes(prev, &tra);
win->skey = tra.s;
......@@ -3401,6 +3405,30 @@ _return:
}
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
SFilterInfo *info = NULL;
int32_t code = 0;
*isStrict = true;
FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP));
if (info->scalarMode) {
*win = TSWINDOW_INITIALIZER;
*isStrict = false;
goto _return;
}
FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict));
_return:
filterFreeInfo(info);
FLT_RET(code);
}
int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) {
if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) {
return TSDB_CODE_SUCCESS;
......
......@@ -241,6 +241,7 @@ TEST(timerangeTest, greater) {
bool isStrict = false;
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
ASSERT_EQ(code, 0);
ASSERT_EQ(isStrict, true);
ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, INT64_MAX);
//filterFreeInfo(filter);
......@@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) {
STimeWindow win = {0};
bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
ASSERT_EQ(isStrict, true);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, tbig);
......@@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) {
nodesDestroyNode(logicNode);
}
TEST(timerangeTest, greater_and_lower_not_strict) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0};
int64_t tsmall1 = 222, tbig1 = 333;
int64_t tsmall2 = 444, tbig2 = 555;
SNode *list[2] = {0};
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1);
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
list[0] = opNode1;
list[1] = opNode2;
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2);
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
list[0] = opNode1;
list[1] = opNode2;
flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2);
list[0] = logicNode1;
list[1] = logicNode2;
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2);
//SFilterInfo *filter = NULL;
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
//ASSERT_EQ(code, 0);
STimeWindow win = {0};
bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict);
ASSERT_EQ(isStrict, false);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall1);
ASSERT_EQ(win.ekey, tbig2);
//filterFreeInfo(filter);
nodesDestroyNode(logicNode1);
}
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
......
......@@ -1235,6 +1235,22 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
return TSDB_CODE_SUCCESS;
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -1247,19 +1263,21 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
int32_t s = taosHashGetSize(pJob->execTasks);
if (s <= 0) {
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
if (NULL == pTask) {
if (TDMT_VND_EXPLAIN_RSP == msgType) {
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
} else {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
if (NULL == pTask) {
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册