未验证 提交 548cc2c3 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21701 from taosdata/feat/TD-24499

feat: support fill history with subquery
...@@ -1975,6 +1975,7 @@ typedef struct { ...@@ -1975,6 +1975,7 @@ typedef struct {
SArray* fillNullCols; // array of SColLocation SArray* fillNullCols; // array of SColLocation
int64_t deleteMark; int64_t deleteMark;
int8_t igUpdate; int8_t igUpdate;
int64_t lastTs;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
...@@ -2497,6 +2498,7 @@ typedef struct { ...@@ -2497,6 +2498,7 @@ typedef struct {
int64_t stime; // timestamp precision ms int64_t stime; // timestamp precision ms
int64_t reqRid; int64_t reqRid;
bool stableQuery; bool stableQuery;
bool isSubQuery;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum; int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc> SArray* subDesc; // SArray<SQuerySubDesc>
......
...@@ -87,6 +87,7 @@ typedef struct SCatalogReq { ...@@ -87,6 +87,7 @@ typedef struct SCatalogReq {
bool dNodeRequired; // valid dnode bool dNodeRequired; // valid dnode
bool svrVerRequired; bool svrVerRequired;
bool forceUpdate; bool forceUpdate;
bool cloned;
} SCatalogReq; } SCatalogReq;
typedef struct SMetaRes { typedef struct SMetaRes {
......
...@@ -233,6 +233,7 @@ bool fmIsGroupKeyFunc(int32_t funcId); ...@@ -233,6 +233,7 @@ bool fmIsGroupKeyFunc(int32_t funcId);
bool fmIsBlockDistFunc(int32_t funcId); bool fmIsBlockDistFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType); void getLastCacheDataType(SDataType* pType);
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
...@@ -425,16 +425,18 @@ typedef struct SStreamOptions { ...@@ -425,16 +425,18 @@ typedef struct SStreamOptions {
} SStreamOptions; } SStreamOptions;
typedef struct SCreateStreamStmt { typedef struct SCreateStreamStmt {
ENodeType type; ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN]; char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN]; char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN]; char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists; bool ignoreExists;
SStreamOptions* pOptions; SStreamOptions* pOptions;
SNode* pQuery; SNode* pQuery;
SNodeList* pTags; SNode* pPrevQuery;
SNode* pSubtable; SNodeList* pTags;
SNodeList* pCols; SNode* pSubtable;
SNodeList* pCols;
SCMCreateStreamReq* pReq;
} SCreateStreamStmt; } SCreateStreamStmt;
typedef struct SDropStreamStmt { typedef struct SDropStreamStmt {
......
...@@ -617,6 +617,7 @@ typedef struct SQueryPlan { ...@@ -617,6 +617,7 @@ typedef struct SQueryPlan {
int32_t numOfSubplans; int32_t numOfSubplans;
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
SExplainInfo explainInfo; SExplainInfo explainInfo;
void* pPostPlan;
} SQueryPlan; } SQueryPlan;
const char* dataOrderStr(EDataOrderLevel order); const char* dataOrderStr(EDataOrderLevel order);
......
...@@ -441,7 +441,9 @@ typedef struct SQuery { ...@@ -441,7 +441,9 @@ typedef struct SQuery {
EQueryExecStage execStage; EQueryExecStage execStage;
EQueryExecMode execMode; EQueryExecMode execMode;
bool haveResultSet; bool haveResultSet;
SNode* pPrevRoot;
SNode* pRoot; SNode* pRoot;
SNode* pPostRoot;
int32_t numOfResCols; int32_t numOfResCols;
SSchema* pResSchema; SSchema* pResSchema;
int8_t precision; int8_t precision;
......
...@@ -74,6 +74,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata ...@@ -74,6 +74,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
const struct SMetaData* pMetaData, SQuery* pQuery); const struct SMetaData* pMetaData, SQuery* pQuery);
int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData,
SQuery* pQuery); SQuery* pQuery);
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow);
void qDestroyParseContext(SParseContext* pCxt); void qDestroyParseContext(SParseContext* pCxt);
......
...@@ -52,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo ...@@ -52,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @groupId id of a group of datasource subplans of this @pSubplan // @groupId id of a group of datasource subplans of this @pSubplan
// @pSource one execution location of this group of datasource subplans // @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qContinuePlanPostQuery(void *pPostPlan);
void qClearSubplanExecutionNode(SSubplan* pSubplan); void qClearSubplanExecutionNode(SSubplan* pSubplan);
......
...@@ -227,6 +227,12 @@ typedef struct { ...@@ -227,6 +227,12 @@ typedef struct {
STaosxRsp rsp; STaosxRsp rsp;
} SMqTaosxRspObj; } SMqTaosxRspObj;
typedef struct SReqRelInfo {
uint64_t userRefId;
uint64_t prevRefId;
uint64_t nextRefId;
} SReqRelInfo;
typedef struct SRequestObj { typedef struct SRequestObj {
int8_t resType; // query or tmq int8_t resType; // query or tmq
uint64_t requestId; uint64_t requestId;
...@@ -250,10 +256,14 @@ typedef struct SRequestObj { ...@@ -250,10 +256,14 @@ typedef struct SRequestObj {
bool validateOnly; // todo refactor bool validateOnly; // todo refactor
bool killed; bool killed;
bool inRetry; bool inRetry;
bool isSubReq;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId; int64_t allocatorRefId;
SQuery* pQuery; SQuery* pQuery;
void* pPostPlan;
SReqRelInfo relation;
void* pWrapper;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {
...@@ -279,6 +289,7 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, ...@@ -279,6 +289,7 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid); int64_t reqid);
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
...@@ -368,6 +379,7 @@ typedef struct SSqlCallbackWrapper { ...@@ -368,6 +379,7 @@ typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx; SParseContext* pParseCtx;
SCatalogReq* pCatalogReq; SCatalogReq* pCatalogReq;
SRequestObj* pRequest; SRequestObj* pRequest;
void* pPlanInfo;
} SSqlCallbackWrapper; } SSqlCallbackWrapper;
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
...@@ -382,6 +394,12 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); ...@@ -382,6 +394,12 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest); bool qnodeRequired(SRequestObj* pRequest);
void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code);
void restartAsyncQuery(SRequestObj *pRequest, int32_t code);
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest);
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce);
void returnToUser(SRequestObj* pRequest);
void stopAllQueries(SRequestObj *pRequest);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -358,6 +358,49 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri ...@@ -358,6 +358,49 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
void destroySubRequests(SRequestObj *pRequest) {
int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return;
}
SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
tmpRefId, pTmp->requestId);
break;
}
}
for (int32_t i = reqIdx; i >= 0; i--) {
removeRequest(pReqList[i]->self);
}
tmpRefId = pRequest->relation.nextRefId;
while (tmpRefId) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
}
void doDestroyRequest(void *p) { void doDestroyRequest(void *p) {
if (NULL == p) { if (NULL == p) {
return; return;
...@@ -368,10 +411,14 @@ void doDestroyRequest(void *p) { ...@@ -368,10 +411,14 @@ void doDestroyRequest(void *p) {
uint64_t reqId = pRequest->requestId; uint64_t reqId = pRequest->requestId;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroySubRequests(pRequest);
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
destorySqlCallbackWrapper(pRequest->pWrapper);
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->pDb); taosMemoryFreeClear(pRequest->pDb);
...@@ -412,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) { ...@@ -412,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) {
removeRequest(pRequest->self); removeRequest(pRequest->self);
} }
void taosStopQueryImpl(SRequestObj *pRequest) {
pRequest->killed = true;
// It is not a query, no need to stop.
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}
void stopAllQueries(SRequestObj *pRequest) {
int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return;
}
SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
tmpRefId, pTmp->requestId);
break;
}
}
for (int32_t i = reqIdx; i >= 0; i--) {
taosStopQueryImpl(pReqList[i]);
}
taosStopQueryImpl(pRequest);
tmpRefId = pRequest->relation.nextRefId;
while (tmpRefId) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
tmpRefId = pTmp->relation.nextRefId;
taosStopQueryImpl(pTmp);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
}
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) { static void *tscCrashReportThreadFp(void *param) {
......
...@@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.useconds = now - pRequest->metric.start; desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self; desc.reqRid = pRequest->self;
desc.stableQuery = pRequest->stableQuery; desc.stableQuery = pRequest->stableQuery;
desc.isSubQuery = pRequest->isSubReq;
taosGetFqdn(desc.fqdn); taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.subplanNum; desc.subPlanNum = pRequest->body.subplanNum;
......
...@@ -237,6 +237,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, ...@@ -237,6 +237,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) {
int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
if (TSDB_CODE_SUCCESS == code) {
pRequest->relation.prevRefId = (*pNewRequest)->self;
(*pNewRequest)->relation.nextRefId = pRequest->self;
(*pNewRequest)->relation.userRefId = pRequest->self;
(*pNewRequest)->isSubReq = true;
}
return code;
}
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) { int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
...@@ -878,6 +889,81 @@ static bool incompletaFileParsing(SNode* pStmt) { ...@@ -878,6 +889,81 @@ static bool incompletaFileParsing(SNode* pStmt) {
return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing; return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
} }
void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) {
SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
int64_t analyseStart = taosGetTimestampUs();
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row);
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
}
if (TSDB_CODE_SUCCESS == code) {
code = qContinuePlanPostQuery(pRequest->pPostPlan);
}
nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
handleQueryAnslyseRes(pWrapper, NULL, code);
}
void returnToUser(SRequestObj* pRequest) {
if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
return;
}
SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
if (pUserReq) {
pUserReq->code = pRequest->code;
// return to client
pUserReq->body.queryFp(pUserReq->body.param, pUserReq, pUserReq->code);
releaseRequest(pRequest->relation.userRefId);
return;
} else {
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
pRequest->relation.userRefId, pRequest->requestId);
}
}
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
SRequestObj* pRequest = (SRequestObj*)res;
if (pRequest->code) {
returnToUser(pRequest);
return;
}
TAOS_ROW row = NULL;
if (rowNum > 0) {
row = taos_fetch_row(res); // for single row only now
}
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
if (pNextReq) {
continuePostSubQuery(pNextReq, row);
releaseRequest(pRequest->relation.nextRefId);
} else {
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
pRequest->relation.nextRefId, pRequest->requestId);
}
}
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
SRequestObj* pRequest = pWrapper->pRequest;
if (TD_RES_QUERY(pRequest)) {
taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
return;
}
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
if (pNextReq) {
continuePostSubQuery(pNextReq, NULL);
releaseRequest(pRequest->relation.nextRefId);
} else {
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
pRequest->relation.nextRefId, pRequest->requestId);
}
}
// todo refacto the error code mgmt // todo refacto the error code mgmt
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SSqlCallbackWrapper* pWrapper = param; SSqlCallbackWrapper* pWrapper = param;
...@@ -912,12 +998,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -912,12 +998,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self,
tstrerror(code), pRequest->retry, pRequest->requestId); tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; restartAsyncQuery(pRequest, code);
schedulerFreeJob(&pRequest->body.queryJob, 0);
qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL;
destorySqlCallbackWrapper(pWrapper);
doAsyncQuery(pRequest, true);
return; return;
} }
...@@ -938,10 +1019,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -938,10 +1019,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
return; return;
} }
destorySqlCallbackWrapper(pWrapper); if (pRequest->relation.nextRefId) {
handlePostSubQuery(pWrapper);
} else {
destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
// return to client // return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
} }
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
...@@ -1049,6 +1135,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat ...@@ -1049,6 +1135,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
pRequest->requestId); pRequest->requestId);
} else { } else {
pRequest->body.subplanNum = pDag->numOfSubplans; pRequest->body.subplanNum = pDag->numOfSubplans;
TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
} }
pRequest->metric.execStart = taosGetTimestampUs(); pRequest->metric.execStart = taosGetTimestampUs();
...@@ -1084,6 +1171,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat ...@@ -1084,6 +1171,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId); pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
} }
...@@ -1103,6 +1191,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM ...@@ -1103,6 +1191,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
pRequest->body.execMode = pQuery->execMode; pRequest->body.execMode = pQuery->execMode;
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) { if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
} }
if (pQuery->pRoot && !pRequest->inRetry) { if (pQuery->pRoot && !pRequest->inRetry) {
...@@ -2402,3 +2491,90 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, ...@@ -2402,3 +2491,90 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
return pRequest; return pRequest;
} }
static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
return;
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
return;
}
pRequest->code =
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId);
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
}
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) {
pRequest->body.fetchFp = fp;
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// all data has returned to App already, no need to try again
if (pResultInfo->completed) {
// it is a local executed query, no need to do async fetch
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
} else {
pResultInfo->localResultFetched = true;
}
} else {
pResultInfo->numOfRows = 0;
}
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
}
...@@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) {
return code; return code;
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return; return;
} }
SRequestObj *pRequest = (SRequestObj *)res; stopAllQueries((SRequestObj*)res);
pRequest->killed = true;
// It is not a query, no need to stop.
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
...@@ -774,8 +765,13 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) { ...@@ -774,8 +765,13 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) {
taosArrayDestroy(pCatalogReq->pDbVgroup); taosArrayDestroy(pCatalogReq->pDbVgroup);
taosArrayDestroy(pCatalogReq->pDbCfg); taosArrayDestroy(pCatalogReq->pDbCfg);
taosArrayDestroy(pCatalogReq->pDbInfo); taosArrayDestroy(pCatalogReq->pDbInfo);
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); if (pCatalogReq->cloned) {
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); taosArrayDestroy(pCatalogReq->pTableMeta);
taosArrayDestroy(pCatalogReq->pTableHash);
} else {
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
}
taosArrayDestroy(pCatalogReq->pUdf); taosArrayDestroy(pCatalogReq->pUdf);
taosArrayDestroy(pCatalogReq->pIndex); taosArrayDestroy(pCatalogReq->pIndex);
taosArrayDestroy(pCatalogReq->pUser); taosArrayDestroy(pCatalogReq->pUser);
...@@ -794,26 +790,108 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { ...@@ -794,26 +790,108 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
} }
void destroyCtxInRequest(SRequestObj* pRequest) {
schedulerFreeJob(&pRequest->body.queryJob, 0);
qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL;
destorySqlCallbackWrapper(pRequest->pWrapper);
pRequest->pWrapper = NULL;
}
static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
int64_t analyseStart = taosGetTimestampUs(); int64_t analyseStart = taosGetTimestampUs();
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart; pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
if (code == TSDB_CODE_SUCCESS) { if (TSDB_CODE_SUCCESS == code) {
code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
}
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
}
int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
int32_t code = TSDB_CODE_SUCCESS;
SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
if (pTarget == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL);
pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL);
pTarget->pDbInfo = taosArrayDup(pSrc->pDbInfo, NULL);
pTarget->pTableMeta = taosArrayDup(pSrc->pTableMeta, NULL);
pTarget->pTableHash = taosArrayDup(pSrc->pTableHash, NULL);
pTarget->pUdf = taosArrayDup(pSrc->pUdf, NULL);
pTarget->pIndex = taosArrayDup(pSrc->pIndex, NULL);
pTarget->pUser = taosArrayDup(pSrc->pUser, NULL);
pTarget->pTableIndex = taosArrayDup(pSrc->pTableIndex, NULL);
pTarget->pTableCfg = taosArrayDup(pSrc->pTableCfg, NULL);
pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL);
pTarget->qNodeRequired = pSrc->qNodeRequired;
pTarget->dNodeRequired = pSrc->dNodeRequired;
pTarget->svrVerRequired = pSrc->svrVerRequired;
pTarget->forceUpdate = pSrc->forceUpdate;
pTarget->cloned = true;
*ppTarget = pTarget;
}
return code;
}
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) {
SRequestObj* pNewRequest = NULL;
SSqlCallbackWrapper* pNewWrapper = NULL;
int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
if (code) {
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
return;
}
pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pNewRequest->pQuery) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pNewRequest->pQuery->pRoot = pRoot;
pRoot = NULL;
pNewRequest->pQuery->execStage = QUERY_EXEC_STAGE_ANALYSE;
}
if (TSDB_CODE_SUCCESS == code) {
code = prepareAndParseSqlSyntax(&pNewWrapper, pNewRequest, false);
}
if (TSDB_CODE_SUCCESS == code) {
code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
}
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
nodesDestroyNode(pRoot);
}
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery;
if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) {
SNode* prevRoot = pQuery->pPrevRoot;
pQuery->pPrevRoot = NULL;
handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot);
return;
}
if (code == TSDB_CODE_SUCCESS) {
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) { if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type; pRequest->stmtType = pQuery->pRoot->type;
} }
}
pRequest->metric.analyseCostUs = taosGetTimestampUs() - analyseStart;
if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
setResPrecision(&pRequest->body.resInfo, pQuery->precision); setResPrecision(&pRequest->body.resInfo, pQuery->precision);
...@@ -826,14 +904,14 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t ...@@ -826,14 +904,14 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper); launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper);
} else { } else {
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
qDestroyQuery(pRequest->pQuery); qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL; pRequest->pQuery = NULL;
if (NEED_CLIENT_HANDLE_ERROR(code)) { if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; restartAsyncQuery(pRequest, code);
doAsyncQuery(pRequest, true);
return; return;
} }
...@@ -841,7 +919,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t ...@@ -841,7 +919,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self,
tstrerror(code), pRequest->requestId); tstrerror(code), pRequest->requestId);
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); returnToUser(pRequest);
} }
} }
...@@ -904,6 +982,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c ...@@ -904,6 +982,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code, tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
tstrerror(code), pWrapper->pRequest->requestId); tstrerror(code), pWrapper->pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -920,6 +999,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) ...@@ -920,6 +999,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code, tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
tstrerror(code), pWrapper->pRequest->requestId); tstrerror(code), pWrapper->pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -967,27 +1047,16 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { ...@@ -967,27 +1047,16 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) {
int32_t code = TSDB_CODE_SUCCESS;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SSqlCallbackWrapper *pWrapper = NULL; SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
int32_t code = TSDB_CODE_SUCCESS; if (pWrapper == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { } else {
code = pRequest->prevCode; pWrapper->pRequest = pRequest;
terrno = code; pRequest->pWrapper = pWrapper;
pRequest->code = code; *ppWrapper = pWrapper;
tscDebug("call sync query cb with code: %s", tstrerror(code));
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
return;
}
if (TSDB_CODE_SUCCESS == code) {
pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pWrapper->pRequest = pRequest;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -999,7 +1068,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -999,7 +1068,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && NULL == pRequest->pQuery) {
int64_t syntaxStart = taosGetTimestampUs(); int64_t syntaxStart = taosGetTimestampUs();
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq)); pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
...@@ -1014,6 +1083,27 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -1014,6 +1083,27 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart; pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart;
} }
return code;
}
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SSqlCallbackWrapper *pWrapper = NULL;
int32_t code = TSDB_CODE_SUCCESS;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
code = pRequest->prevCode;
terrno = code;
pRequest->code = code;
tscDebug("call sync query cb with code: %s", tstrerror(code));
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
return;
}
if (TSDB_CODE_SUCCESS == code) {
code = prepareAndParseSqlSyntax(&pWrapper, pRequest, updateMetaForce);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pRequest->stmtType = pRequest->pQuery->pRoot->type; pRequest->stmtType = pRequest->pQuery->pRoot->type;
code = phaseAsyncQuery(pWrapper); code = phaseAsyncQuery(pWrapper);
...@@ -1023,6 +1113,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -1023,6 +1113,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
pRequest->pWrapper = NULL;
qDestroyQuery(pRequest->pQuery); qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL; pRequest->pQuery = NULL;
...@@ -1040,48 +1131,57 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -1040,48 +1131,57 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
} }
static void fetchCallback(void *pResult, void *param, int32_t code) { void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)param; int32_t reqIdx = 0;
SRequestObj *pReqList[16] = {NULL};
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SRequestObj *pUserReq = NULL;
pReqList[0] = pRequest;
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, uint64_t tmpRefId = 0;
tstrerror(code), pRequest->requestId); SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
pResultInfo->pData = pResult; tmpRefId = pTmp->relation.prevRefId;
pResultInfo->numOfRows = 0; pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
tmpRefId, pTmp->requestId);
break;
}
}
if (code != TSDB_CODE_SUCCESS) { tmpRefId = pRequest->relation.nextRefId;
pRequest->code = code; while (tmpRefId) {
taosMemoryFreeClear(pResultInfo->pData); pTmp = acquireRequest(tmpRefId);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); if (pTmp) {
return; tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
break;
}
} }
if (pRequest->code != TSDB_CODE_SUCCESS) { for (int32_t i = reqIdx; i >= 0; i--) {
taosMemoryFreeClear(pResultInfo->pData); destroyCtxInRequest(pReqList[i]);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) {
return; pUserReq = pReqList[i];
} else {
removeRequest(pReqList[i]->self);
}
} }
pRequest->code = if (pUserReq) {
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); pUserReq->prevCode = code;
if (pRequest->code != TSDB_CODE_SUCCESS) { memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
pResultInfo->numOfRows = 0;
pRequest->code = code;
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else { } else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscError("user req is missing");
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, return;
pRequest->requestId);
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
} }
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); doAsyncQuery(pUserReq, true);
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
...@@ -1095,43 +1195,8 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -1095,43 +1195,8 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
} }
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// all data has returned to App already, no need to try again
if (pResultInfo->completed) {
// it is a local executed query, no need to do async fetch
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
} else {
pResultInfo->localResultFetched = true;
}
} else {
pResultInfo->numOfRows = 0;
}
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req); taosAsyncFetchImpl(pRequest, fp, param);
} }
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
...@@ -388,6 +388,7 @@ static const SSysDbTableSchema querySchema[] = { ...@@ -388,6 +388,7 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false},
{.name = "sub_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false},
{.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......
...@@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR ...@@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
...@@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) ...@@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
...@@ -6167,6 +6169,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -6167,6 +6169,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
} }
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1; if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -6252,6 +6255,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -6252,6 +6255,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1; if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -6318,6 +6322,9 @@ int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStrea ...@@ -6318,6 +6322,9 @@ int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStrea
} }
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
if (NULL == pReq) {
return;
}
taosArrayDestroy(pReq->pTags); taosArrayDestroy(pReq->pTags);
taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast); taosMemoryFreeClear(pReq->ast);
......
...@@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->isSubQuery, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
......
...@@ -344,7 +344,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) { ...@@ -344,7 +344,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) {
return fmGetFuncInfo(pFunc, msg, sizeof(msg)); return fmGetFuncInfo(pFunc, msg, sizeof(msg));
} }
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) { if (NULL == pFunc) {
return NULL; return NULL;
......
...@@ -953,6 +953,8 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -953,6 +953,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pQuery); nodesDestroyNode(pStmt->pQuery);
nodesDestroyList(pStmt->pTags); nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable); nodesDestroyNode(pStmt->pSubtable);
tFreeSCMCreateStreamReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
break; break;
} }
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
...@@ -1052,6 +1054,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1052,6 +1054,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_QUERY: { case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode; SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
nodesDestroyNode(pQuery->pPostRoot);
taosMemoryFreeClear(pQuery->pResSchema); taosMemoryFreeClear(pQuery->pResSchema);
if (NULL != pQuery->pCmdMsg) { if (NULL != pQuery->pCmdMsg) {
taosMemoryFreeClear(pQuery->pCmdMsg->pMsg); taosMemoryFreeClear(pQuery->pCmdMsg->pMsg);
......
...@@ -34,6 +34,7 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* ...@@ -34,6 +34,7 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache*
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -384,6 +384,10 @@ static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateS ...@@ -384,6 +384,10 @@ static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateS
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
} }
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
code = reserveDbCfgForLastRow(pCxt, pSelect->pFromTable);
}
return code; return code;
} }
......
...@@ -53,6 +53,8 @@ typedef struct STranslateContext { ...@@ -53,6 +53,8 @@ typedef struct STranslateContext {
bool createStream; bool createStream;
bool stableQuery; bool stableQuery;
bool showRewrite; bool showRewrite;
SNode* pPrevRoot;
SNode* pPostRoot;
} STranslateContext; } STranslateContext;
typedef struct SBuildTopicContext { typedef struct SBuildTopicContext {
...@@ -276,6 +278,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { ...@@ -276,6 +278,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal); static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal);
static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt);
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery);
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery);
static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery);
static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; }
...@@ -6763,6 +6769,54 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream ...@@ -6763,6 +6769,54 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream
return code; return code;
} }
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery) {
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == col) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(col->tableAlias, pTable);
strcpy(col->colName, pMeta->schema[0].name);
SNodeList* pParamterList = nodesMakeList();
if (NULL == pParamterList) {
nodesDestroyNode((SNode *)col);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodesListStrictAppend(pParamterList, (SNode *)col);
if (code) {
nodesDestroyNode((SNode *)col);
nodesDestroyList(pParamterList);
return code;
}
SNode* pFunc = (SNode*)createFunction("last", pParamterList);
if (NULL == pFunc) {
nodesDestroyList(pParamterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
SNodeList* pProjectionList = nodesMakeList();
if (NULL == pProjectionList) {
nodesDestroyList(pParamterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
code = nodesListStrictAppend(pProjectionList, pFunc);
if (code) {
nodesDestroyNode(pFunc);
nodesDestroyList(pProjectionList);
return code;
}
code = createSimpleSelectStmtFromProjList(pDb, pTable, pProjectionList, (SSelectStmt **)pQuery);
if (code) {
nodesDestroyList(pProjectionList);
return code;
}
return code;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true; pCxt->createStream = true;
STableMeta* pMeta = NULL; STableMeta* pMeta = NULL;
...@@ -6789,6 +6843,18 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt ...@@ -6789,6 +6843,18 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
} }
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable);
code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta, &pStmt->pPrevQuery);
/*
if (TSDB_CODE_SUCCESS == code) {
STranslateContext cxt = {0};
int32_t code = initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt);
code = translateQuery(&cxt, pStmt->pPrevQuery);
destroyTranslateContext(&cxt);
}
*/
}
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
return code; return code;
} }
...@@ -6855,13 +6921,86 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -6855,13 +6921,86 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
code = buildCreateStreamReq(pCxt, pStmt, &createReq); code = buildCreateStreamReq(pCxt, pStmt, &createReq);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); if (NULL == pStmt->pPrevQuery) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
} else {
pStmt->pReq = taosMemoryMalloc(sizeof(createReq));
if (NULL == pStmt->pReq) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(pStmt->pReq, &createReq, sizeof(createReq));
memset(&createReq, 0, sizeof(createReq));
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
}
}
} }
tFreeSCMCreateStreamReq(&createReq); tFreeSCMCreateStreamReq(&createReq);
return code; return code;
} }
int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
int32_t code = TSDB_CODE_SUCCESS;
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
return code;
}
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow)) {
return code;
}
SIntervalWindowNode* pWindow = (SIntervalWindowNode*)pSelect->pWindow;
pInterval->interval = ((SValueNode*)pWindow->pInterval)->datum.i;
pInterval->intervalUnit = ((SValueNode*)pWindow->pInterval)->unit;
pInterval->offset = (NULL != pWindow->pOffset ? ((SValueNode*)pWindow->pOffset)->datum.i : 0);
pInterval->sliding = (NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->datum.i : pInterval->interval);
pInterval->slidingUnit =
(NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->unit : pInterval->intervalUnit);
pInterval->precision = ((SColumnNode*)pWindow->pCol)->node.resType.precision;
return code;
}
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
STranslateContext cxt = {0};
SInterval interval = {0};
int64_t lastTs = 0;
int32_t code = initTranslateContext(pParseCxt, NULL, &cxt);
if (TSDB_CODE_SUCCESS == code) {
code = buildIntervalForCreateStream(pStmt, &interval);
}
if (TSDB_CODE_SUCCESS == code) {
if (pResRow && pResRow[0]) {
lastTs = *(int64_t*)pResRow[0];
} else if (interval.interval > 0) {
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
} else {
lastTs = taosGetTimestampMs();
}
}
if (TSDB_CODE_SUCCESS == code) {
if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval);
} else {
pStmt->pReq->lastTs = lastTs;
}
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery);
}
setRefreshMate(&cxt, pQuery);
destroyTranslateContext(&cxt);
tFreeSCMCreateStreamReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
return code;
}
static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) { static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) {
SMDropStreamReq dropReq = {0}; SMDropStreamReq dropReq = {0};
SName name; SName name;
...@@ -7542,8 +7681,7 @@ static SNodeList* createProjectCols(int32_t ncols, const char* const pCols[]) { ...@@ -7542,8 +7681,7 @@ static SNodeList* createProjectCols(int32_t ncols, const char* const pCols[]) {
return pProjections; return pProjections;
} }
static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32_t numOfProjs, static int32_t createSimpleSelectStmtImpl(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) {
const char* const pProjCol[], SSelectStmt** pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT); SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) { if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -7559,27 +7697,38 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32 ...@@ -7559,27 +7697,38 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32
snprintf(pRealTable->table.tableName, sizeof(pRealTable->table.tableName), "%s", pTable); snprintf(pRealTable->table.tableName, sizeof(pRealTable->table.tableName), "%s", pTable);
snprintf(pRealTable->table.tableAlias, sizeof(pRealTable->table.tableAlias), "%s", pTable); snprintf(pRealTable->table.tableAlias, sizeof(pRealTable->table.tableAlias), "%s", pTable);
pSelect->pFromTable = (SNode*)pRealTable; pSelect->pFromTable = (SNode*)pRealTable;
pSelect->pProjectionList = pProjectionList;
*pStmt = pSelect;
return TSDB_CODE_SUCCESS;
}
static int32_t createSimpleSelectStmtFromCols(const char* pDb, const char* pTable, int32_t numOfProjs,
const char* const pProjCol[], SSelectStmt** pStmt) {
SNodeList* pProjectionList = NULL;
if (numOfProjs >= 0) { if (numOfProjs >= 0) {
pSelect->pProjectionList = createProjectCols(numOfProjs, pProjCol); pProjectionList = createProjectCols(numOfProjs, pProjCol);
if (NULL == pSelect->pProjectionList) { if (NULL == pProjectionList) {
nodesDestroyNode((SNode*)pSelect);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
*pStmt = pSelect; return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt);
}
return TSDB_CODE_SUCCESS; static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) {
return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt);
} }
static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) { static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) {
const SSysTableShowAdapter* pShow = &sysTableShowAdapter[showType - SYSTABLE_SHOW_TYPE_OFFSET]; const SSysTableShowAdapter* pShow = &sysTableShowAdapter[showType - SYSTABLE_SHOW_TYPE_OFFSET];
return createSimpleSelectStmt(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt); return createSimpleSelectStmtFromCols(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt);
} }
static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) { static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) {
return createSimpleSelectStmt(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput); return createSimpleSelectStmtFromCols(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput);
} }
static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SNode* pRight, SNode** pOp) { static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SNode* pRight, SNode** pOp) {
...@@ -7713,7 +7862,7 @@ static int32_t createShowTableTagsProjections(SNodeList** pProjections, SNodeLis ...@@ -7713,7 +7862,7 @@ static int32_t createShowTableTagsProjections(SNodeList** pProjections, SNodeLis
static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
SShowTableTagsStmt* pShow = (SShowTableTagsStmt*)pQuery->pRoot; SShowTableTagsStmt* pShow = (SShowTableTagsStmt*)pQuery->pRoot;
SSelectStmt* pSelect = NULL; SSelectStmt* pSelect = NULL;
int32_t code = createSimpleSelectStmt(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal, int32_t code = createSimpleSelectStmtFromCols(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal,
-1, NULL, &pSelect); -1, NULL, &pSelect);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags); code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags);
...@@ -9038,6 +9187,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -9038,6 +9187,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
} }
break; break;
default: default:
pQuery->haveResultSet = false;
pQuery->execMode = QUERY_EXEC_MODE_RPC; pQuery->execMode = QUERY_EXEC_MODE_RPC;
if (NULL != pCxt->pCmdMsg) { if (NULL != pCxt->pCmdMsg) {
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg); TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg);
...@@ -9072,6 +9222,10 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe ...@@ -9072,6 +9222,10 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(&cxt, pQuery->pRoot); code = translateQuery(&cxt, pQuery->pRoot);
} }
if (TSDB_CODE_SUCCESS == code && (cxt.pPrevRoot || cxt.pPostRoot)) {
pQuery->pPrevRoot = cxt.pPrevRoot;
pQuery->pPostRoot = cxt.pPostRoot;
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery); code = setQuery(&cxt, pQuery);
} }
......
...@@ -204,7 +204,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata ...@@ -204,7 +204,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
const struct SMetaData* pMetaData, SQuery* pQuery) { const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = nodesAcquireAllocator(pCxt->allocatorId); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && pCatalogReq) {
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache); code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -221,6 +221,19 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, ...@@ -221,6 +221,19 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq,
return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData); return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData);
} }
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) {
case QUERY_NODE_CREATE_STREAM_STMT:
code = translatePostCreateStream(pCxt, pQuery, pResRow);
break;
default:
break;
}
return code;
}
void qDestroyParseContext(SParseContext* pCxt) { void qDestroyParseContext(SParseContext* pCxt) {
if (NULL == pCxt) { if (NULL == pCxt) {
return; return;
......
...@@ -885,12 +885,12 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -885,12 +885,12 @@ TEST_F(ParserInitialCTest, createStream) {
setCreateStreamReq( setCreateStreamReq(
"s1", "test", "s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 ignore " "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 0 ignore "
"update 1 into st3 as select count(*) from t1 interval(10s)", "update 1 into st3 as select count(*) from t1 interval(10s)",
"st3", 1); "st3", 1);
setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND,
10 * MILLISECOND_PER_SECOND, 0, 1, 1); 10 * MILLISECOND_PER_SECOND, 0, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 IGNORE " run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 0 IGNORE "
"UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); "UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
......
...@@ -97,6 +97,12 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown ...@@ -97,6 +97,12 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qContinuePlanPostQuery(void *pPostPlan) {
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource); return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
......
...@@ -21,9 +21,7 @@ class TDTestCase: ...@@ -21,9 +21,7 @@ class TDTestCase:
tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)") tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)")
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns") tdSql.execute("select count(*) from information_schema.ins_columns")
# enterprise version: 295, community version: 285
tdSql.checkData(0, 0, 295)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14) tdSql.checkRows(14)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册