未验证 提交 938d6480 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11378 from taosdata/feature/qnode

feat: add query node, i.e. qnode, for query only purpose
...@@ -22,7 +22,7 @@ typedef struct SExplainCtx SExplainCtx; ...@@ -22,7 +22,7 @@ typedef struct SExplainCtx SExplainCtx;
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp); int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp); int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs); int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs);
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp); int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp); int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx); void qExplainFreeCtx(SExplainCtx *pCtx);
......
...@@ -84,10 +84,10 @@ typedef struct { ...@@ -84,10 +84,10 @@ typedef struct {
} SClientHbMgr; } SClientHbMgr;
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp, us
int64_t parsed; // start to parse int64_t parsed; // start to parse, us
int64_t send; // start to send to server int64_t send; // start to send to server, us
int64_t rsp; // receive response from server int64_t rsp; // receive response from server, us
} SQueryExecMetric; } SQueryExecMetric;
typedef struct SInstanceSummary { typedef struct SInstanceSummary {
......
...@@ -65,10 +65,10 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -65,10 +65,10 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64(&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampMs() - pRequest->metric.start; int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d", " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst);
taosReleaseRef(clientConnRefPool, pTscObj->id); taosReleaseRef(clientConnRefPool, pTscObj->id);
} }
...@@ -151,7 +151,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty ...@@ -151,7 +151,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
pRequest->pDb = getDbOfConnection(pObj); pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId(); pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampMs(); pRequest->metric.start = taosGetTimestampUs();
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj; pRequest->pTscObj = pObj;
......
...@@ -435,11 +435,11 @@ static int32_t hbCreateThread() { ...@@ -435,11 +435,11 @@ static int32_t hbCreateThread() {
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
// if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
// terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
// return -1; return -1;
// } }
// taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
return 0; return 0;
} }
...@@ -500,8 +500,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { ...@@ -500,8 +500,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
} }
void appHbMgrCleanup(void) { void appHbMgrCleanup(void) {
taosThreadMutexLock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
...@@ -510,8 +508,6 @@ void appHbMgrCleanup(void) { ...@@ -510,8 +508,6 @@ void appHbMgrCleanup(void) {
taosHashCleanup(pTarget->connInfo); taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL; pTarget->connInfo = NULL;
} }
taosThreadMutexUnlock(&clientHbMgr.lock);
} }
int hbMgrInit() { int hbMgrInit() {
...@@ -532,7 +528,6 @@ int hbMgrInit() { ...@@ -532,7 +528,6 @@ int hbMgrInit() {
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
#if 0
hbStopThread(); hbStopThread();
// destroy all appHbMgr // destroy all appHbMgr
...@@ -545,7 +540,6 @@ void hbMgrCleanUp() { ...@@ -545,7 +540,6 @@ void hbMgrCleanUp() {
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
clientHbMgr.appHbMgrs = NULL; clientHbMgr.appHbMgrs = NULL;
#endif
} }
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
......
...@@ -520,7 +520,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -520,7 +520,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
assert(pRequest->self == pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampMs(); pRequest->metric.rsp = taosGetTimestampUs();
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (pEpSet) { if (pEpSet) {
...@@ -536,10 +536,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -536,10 +536,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) { if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
} else { } else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
} }
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
......
...@@ -1960,7 +1960,7 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR ...@@ -1960,7 +1960,7 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SUseDbBatchRsp)); pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SUseDbRsp));
if (pRsp->pArray == NULL) { if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
......
...@@ -44,6 +44,9 @@ extern "C" { ...@@ -44,6 +44,9 @@ extern "C" {
#define EXPLAIN_OUTPUT_FORMAT "Output: " #define EXPLAIN_OUTPUT_FORMAT "Output: "
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c" #define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64 #define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
//append area //append area
#define EXPLAIN_LEFT_PARENTHESIS_FORMAT " (" #define EXPLAIN_LEFT_PARENTHESIS_FORMAT " ("
...@@ -108,16 +111,19 @@ typedef struct SExplainCtx { ...@@ -108,16 +111,19 @@ typedef struct SExplainCtx {
#define EXPLAIN_ROW_NEW(level, ...) \ #define EXPLAIN_ROW_NEW(level, ...) \
do { \ do { \
if (isVerboseLine) { \ if (isVerboseLine) { \
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s", (level) * 2 + 3, ""); \ tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, "%*s", (level) * 2 + 3, ""); \
} else { \ } else { \
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", "-> "); \ tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, "%*s%s", (level) * 2, "", "-> "); \
} \ } \
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \ tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__); \
} while (0) } while (0)
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__) #define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__)
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0) #define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
#define EXPLAIN_SUM_ROW_NEW(...) tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, __VA_ARGS__)
#define EXPLAIN_SUM_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -52,16 +52,14 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { ...@@ -52,16 +52,14 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
void *pIter = taosHashIterate(pCtx->groupHash, NULL); void *pIter = taosHashIterate(pCtx->groupHash, NULL);
while (pIter) { while (pIter) {
SExplainGroup *group = (SExplainGroup *)pIter; SExplainGroup *group = (SExplainGroup *)pIter;
if (NULL == group->nodeExecInfo) { if (group->nodeExecInfo) {
continue; int32_t num = taosArrayGetSize(group->nodeExecInfo);
for (int32_t i = 0; i < num; ++i) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
taosMemoryFreeClear(rsp->subplanInfo);
}
} }
int32_t num = taosArrayGetSize(group->nodeExecInfo);
for (int32_t i = 0; i < num; ++i) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
taosMemoryFreeClear(rsp->subplanInfo);
}
pIter = taosHashIterate(pCtx->groupHash, pIter); pIter = taosHashIterate(pCtx->groupHash, pIter);
} }
} }
...@@ -896,9 +894,33 @@ _return: ...@@ -896,9 +894,33 @@ _return:
QRY_RET(code); QRY_RET(code);
} }
int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
if (EXPLAIN_MODE_ANALYZE != pCtx->mode) {
return TSDB_CODE_SUCCESS;
}
int32_t tlen = 0;
char *tbuf = pCtx->tbuf;
EXPLAIN_SUM_ROW_NEW(EXPLAIN_RATIO_TIME_FORMAT, pCtx->ratio);
EXPLAIN_SUM_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
EXPLAIN_SUM_ROW_NEW(EXPLAIN_PLANNING_TIME_FORMAT, (double)(pCtx->jobStartTs - pCtx->reqStartTs) / 1000.0);
EXPLAIN_SUM_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
EXPLAIN_SUM_ROW_NEW(EXPLAIN_EXEC_TIME_FORMAT, (double)(pCtx->jobDoneTs - pCtx->jobStartTs) / 1000.0);
EXPLAIN_SUM_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
return TSDB_CODE_SUCCESS;
}
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
...@@ -979,18 +1001,18 @@ _return: ...@@ -979,18 +1001,18 @@ _return:
QRY_RET(code); QRY_RET(code);
} }
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs) { int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs) {
QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx)); QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx));
(*pCtx)->reqStartTs = startTs; (*pCtx)->reqStartTs = startTs;
(*pCtx)->jobStartTs = taosGetTimestampMs(); (*pCtx)->jobStartTs = taosGetTimestampUs();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
int32_t code = 0; int32_t code = 0;
pCtx->jobDoneTs = taosGetTimestampMs(); pCtx->jobDoneTs = taosGetTimestampUs();
atomic_store_8((int8_t *)&pCtx->execDone, true); atomic_store_8((int8_t *)&pCtx->execDone, true);
......
...@@ -207,7 +207,9 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { ...@@ -207,7 +207,9 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
CHECK_RAW_EXPR_NODE(pNode); CHECK_RAW_EXPR_NODE(pNode);
SRawExprNode* pRawExpr = (SRawExprNode*)pNode; SRawExprNode* pRawExpr = (SRawExprNode*)pNode;
SNode* pExpr = pRawExpr->pNode; SNode* pExpr = pRawExpr->pNode;
strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, pRawExpr->n); if (nodesIsExprNode(pExpr)) {
strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, pRawExpr->n);
}
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
return pExpr; return pExpr;
} }
...@@ -498,8 +500,9 @@ SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* p ...@@ -498,8 +500,9 @@ SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* p
if (NULL == pNode || !pCxt->valid) { if (NULL == pNode || !pCxt->valid) {
return pNode; return pNode;
} }
uint32_t maxLen = sizeof(((SExprNode*)pNode)->aliasName); int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n);
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, pAlias->n > maxLen ? maxLen : pAlias->n); strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len);
((SExprNode*)pNode)->aliasName[len] = '\0';
return pNode; return pNode;
} }
......
...@@ -79,6 +79,7 @@ typedef struct SQWConnInfo { ...@@ -79,6 +79,7 @@ typedef struct SQWConnInfo {
typedef struct SQWMsg { typedef struct SQWMsg {
void *node; void *node;
int32_t code;
char *msg; char *msg;
int32_t msgLen; int32_t msgLen;
SQWConnInfo connInfo; SQWConnInfo connInfo;
......
...@@ -536,6 +536,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF) { ...@@ -536,6 +536,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
QW_TASK_DLOG_E("task is dropped");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1239,8 +1241,10 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1239,8 +1241,10 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); if (0 == qwMsg->code) {
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true; rsped = true;
...@@ -1273,7 +1277,7 @@ _return: ...@@ -1273,7 +1277,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (TSDB_CODE_SUCCESS != code) { if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
......
...@@ -549,7 +549,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -549,7 +549,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
int64_t rId = msg->refId; int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
./test.sh -f tsim/query/interval-offset.sim ./test.sh -f tsim/query/interval-offset.sim
./test.sh -f tsim/query/scalarFunction.sim ./test.sh -f tsim/query/scalarFunction.sim
./test.sh -f tsim/query/charScalarFunction.sim ./test.sh -f tsim/query/charScalarFunction.sim
./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/session.sim
# ---- qnode # ---- qnode
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
print ========= start dnode1 as master
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
print ======== step1
sql create database db1 vgroups 3;
sql use db1;
sql show databases;
sql create stable st1 (ts timestamp, f1 int, f2 binary(200)) tags(t1 int);
sql create stable st2 (ts timestamp, f1 int, f2 binary(200)) tags(t1 int);
sql create table tb1 using st1 tags(1);
sql insert into tb1 values (now, 1, "Hash Join (cost=230.47..713.98 rows=101 width=488) (actual time=0.711..7.427 rows=100 loops=1)");
sql create table tb2 using st1 tags(2);
sql insert into tb2 values (now, 2, "Seq Scan on tenk2 t2 (cost=0.00..445.00 rows=10000 width=244) (actual time=0.007..2.583 rows=10000 loops=1)");
sql create table tb3 using st1 tags(3);
sql insert into tb3 values (now, 3, "Hash (cost=229.20..229.20 rows=101 width=244) (actual time=0.659..0.659 rows=100 loops=1)");
sql create table tb4 using st1 tags(4);
sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..229.20 rows=101 width=244) (actual time=0.080..0.526 rows=100 loops=1)");
sql create table tb1 using st2 tags(1);
sql insert into tb1 values (now, 1, "Hash Join (cost=230.47..713.98 rows=101 width=488) (actual time=0.711..7.427 rows=100 loops=1)");
sql create table tb2 using st2 tags(2);
sql insert into tb2 values (now, 2, "Seq Scan on tenk2 t2 (cost=0.00..445.00 rows=10000 width=244) (actual time=0.007..2.583 rows=10000 loops=1)");
sql create table tb3 using st2 tags(3);
sql insert into tb3 values (now, 3, "Hash (cost=229.20..229.20 rows=101 width=244) (actual time=0.659..0.659 rows=100 loops=1)");
sql create table tb4 using st2 tags(4);
sql insert into tb4 values (now, 4, "Bitmap Heap Scan on tenk1 t1 (cost=5.07..229.20 rows=101 width=244) (actual time=0.080..0.526 rows=100 loops=1)");
print ======== step2
sql explain select * from st1 where -2;
sql explain select ts from tb1;
sql explain select * from st1;
sql explain select * from st1 order by ts;
sql explain select * from information_schema.user_stables;
sql explain select count(*),sum(f1) from tb1;
sql explain select count(*),sum(f1) from st1;
sql explain select count(*),sum(f1) from st1 group by f1;
sql explain select count(f1) from tb1 interval(1s, 2d) sliding(3s) fill(prev);
sql explain select min(f1) from st1 interval(1m, 2a) sliding(3n);
print ======== step3
sql explain verbose true select * from st1 where -2;
sql explain verbose true select ts from tb1 where f1 > 0;
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
sql explain verbose true select * from information_schema.user_stables where db_name='db2';
sql explain verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
print ======== step4
sql explain analyze select ts from st1 where -2;
sql explain analyze select ts from tb1;
sql explain analyze select ts from st1;
sql explain analyze select ts from st1;
sql explain analyze select ts from st1 order by ts;
sql explain analyze select * from information_schema.user_stables;
sql explain analyze select count(*),sum(f1) from tb1;
sql explain analyze select count(*),sum(f1) from st1;
sql explain analyze select count(*),sum(f1) from st1 group by f1;
sql explain analyze select count(f1) from tb1 interval(1s, 2d) sliding(3s) fill(prev);
sql explain analyze select min(f1) from st1 interval(3n, 2a) sliding(1n);
print ======== step5
sql explain analyze verbose true select ts from st1 where -2;
sql explain analyze verbose true select ts from tb1;
sql explain analyze verbose true select ts from st1;
sql explain analyze verbose true select ts from st1;
sql explain analyze verbose true select ts from st1 order by ts;
sql explain analyze verbose true select * from information_schema.user_stables;
sql explain analyze verbose true select count(*),sum(f1) from tb1;
sql explain analyze verbose true select count(*),sum(f1) from st1;
sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
sql explain analyze verbose true select count(f1) from tb1 interval(1s, 2d) sliding(3s) fill(prev);
sql explain analyze verbose true select ts from tb1 where f1 > 0;
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
sql explain analyze verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
sql explain analyze verbose true select min(f1) from st1 interval(3n, 2a) sliding(1n);
sql explain analyze verbose true select * from (select min(f1),count(*) a from st1 where f1 > 0) where a < 0;
#not pass case
#sql explain verbose true select count(*),sum(f1) as aa from tb1 where (f1 > 0 or f1 < -1) and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00' order by aa;
#sql explain verbose true select * from st1 where (f1 > 0 or f1 < -1) and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00' order by ts;
#sql explain verbose true select count(*),sum(f1) from st1 where (f1 > 0 or f1 < -1) and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00' order by ts;
#sql explain verbose true select count(f1) from tb1 where (f1 > 0 or f1 < -1) and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00' interval(1s, 2d) sliding(3s) order by ts;
#sql explain verbose true select min(f1) from st1 where (f1 > 0 or f1 < -1) and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00' interval(1m, 2a) sliding(3n) fill(linear) order by ts;
#sql explain select max(f1) from tb1 SESSION(ts, 1s);
#sql explain select max(f1) from st1 SESSION(ts, 1s);
#sql explain select * from tb1, tb2 where tb1.ts=tb2.ts;
#sql explain select * from st1, st2 where tb1.ts=tb2.ts;
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册