未验证 提交 63e78e3e 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #13749 from taosdata/feature/3.0_wxy

fix: handle the memory leak of parser
...@@ -1494,9 +1494,9 @@ typedef struct { ...@@ -1494,9 +1494,9 @@ typedef struct {
int32_t code; int32_t code;
} STaskDropRsp; } STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2 #define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3 #define STREAM_TRIGGER_MAX_DELAY 3
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
...@@ -1506,6 +1506,7 @@ typedef struct { ...@@ -1506,6 +1506,7 @@ typedef struct {
char* sql; char* sql;
char* ast; char* ast;
int8_t triggerType; int8_t triggerType;
int64_t maxDelay;
int64_t watermark; int64_t watermark;
} SCMCreateStreamReq; } SCMCreateStreamReq;
......
...@@ -184,76 +184,77 @@ ...@@ -184,76 +184,77 @@
#define TK_TRIGGER 166 #define TK_TRIGGER 166
#define TK_AT_ONCE 167 #define TK_AT_ONCE 167
#define TK_WINDOW_CLOSE 168 #define TK_WINDOW_CLOSE 168
#define TK_WATERMARK 169 #define TK_MAX_DELAY 169
#define TK_KILL 170 #define TK_WATERMARK 170
#define TK_CONNECTION 171 #define TK_KILL 171
#define TK_TRANSACTION 172 #define TK_CONNECTION 172
#define TK_BALANCE 173 #define TK_TRANSACTION 173
#define TK_VGROUP 174 #define TK_BALANCE 174
#define TK_MERGE 175 #define TK_VGROUP 175
#define TK_REDISTRIBUTE 176 #define TK_MERGE 176
#define TK_SPLIT 177 #define TK_REDISTRIBUTE 177
#define TK_SYNCDB 178 #define TK_SPLIT 178
#define TK_DELETE 179 #define TK_SYNCDB 179
#define TK_NULL 180 #define TK_DELETE 180
#define TK_NK_QUESTION 181 #define TK_NULL 181
#define TK_NK_ARROW 182 #define TK_NK_QUESTION 182
#define TK_ROWTS 183 #define TK_NK_ARROW 183
#define TK_TBNAME 184 #define TK_ROWTS 184
#define TK_QSTARTTS 185 #define TK_TBNAME 185
#define TK_QENDTS 186 #define TK_QSTARTTS 186
#define TK_WSTARTTS 187 #define TK_QENDTS 187
#define TK_WENDTS 188 #define TK_WSTARTTS 188
#define TK_WDURATION 189 #define TK_WENDTS 189
#define TK_CAST 190 #define TK_WDURATION 190
#define TK_NOW 191 #define TK_CAST 191
#define TK_TODAY 192 #define TK_NOW 192
#define TK_TIMEZONE 193 #define TK_TODAY 193
#define TK_COUNT 194 #define TK_TIMEZONE 194
#define TK_FIRST 195 #define TK_COUNT 195
#define TK_LAST 196 #define TK_FIRST 196
#define TK_LAST_ROW 197 #define TK_LAST 197
#define TK_BETWEEN 198 #define TK_LAST_ROW 198
#define TK_IS 199 #define TK_BETWEEN 199
#define TK_NK_LT 200 #define TK_IS 200
#define TK_NK_GT 201 #define TK_NK_LT 201
#define TK_NK_LE 202 #define TK_NK_GT 202
#define TK_NK_GE 203 #define TK_NK_LE 203
#define TK_NK_NE 204 #define TK_NK_GE 204
#define TK_MATCH 205 #define TK_NK_NE 205
#define TK_NMATCH 206 #define TK_MATCH 206
#define TK_CONTAINS 207 #define TK_NMATCH 207
#define TK_JOIN 208 #define TK_CONTAINS 208
#define TK_INNER 209 #define TK_JOIN 209
#define TK_SELECT 210 #define TK_INNER 210
#define TK_DISTINCT 211 #define TK_SELECT 211
#define TK_WHERE 212 #define TK_DISTINCT 212
#define TK_PARTITION 213 #define TK_WHERE 213
#define TK_BY 214 #define TK_PARTITION 214
#define TK_SESSION 215 #define TK_BY 215
#define TK_STATE_WINDOW 216 #define TK_SESSION 216
#define TK_SLIDING 217 #define TK_STATE_WINDOW 217
#define TK_FILL 218 #define TK_SLIDING 218
#define TK_VALUE 219 #define TK_FILL 219
#define TK_NONE 220 #define TK_VALUE 220
#define TK_PREV 221 #define TK_NONE 221
#define TK_LINEAR 222 #define TK_PREV 222
#define TK_NEXT 223 #define TK_LINEAR 223
#define TK_HAVING 224 #define TK_NEXT 224
#define TK_ORDER 225 #define TK_HAVING 225
#define TK_SLIMIT 226 #define TK_ORDER 226
#define TK_SOFFSET 227 #define TK_SLIMIT 227
#define TK_LIMIT 228 #define TK_SOFFSET 228
#define TK_OFFSET 229 #define TK_LIMIT 229
#define TK_ASC 230 #define TK_OFFSET 230
#define TK_NULLS 231 #define TK_ASC 231
#define TK_ID 232 #define TK_NULLS 232
#define TK_NK_BITNOT 233 #define TK_ID 233
#define TK_INSERT 234 #define TK_NK_BITNOT 234
#define TK_VALUES 235 #define TK_INSERT 235
#define TK_IMPORT 236 #define TK_VALUES 236
#define TK_NK_SEMI 237 #define TK_IMPORT 237
#define TK_FILE 238 #define TK_NK_SEMI 238
#define TK_FILE 239
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -175,6 +175,7 @@ bool fmIsRepeatScanFunc(int32_t funcId); ...@@ -175,6 +175,7 @@ bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId); bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId); bool fmIsForbidFillFunc(int32_t funcId);
bool fmIsForbidStreamFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
...@@ -289,6 +289,7 @@ typedef struct SKillStmt { ...@@ -289,6 +289,7 @@ typedef struct SKillStmt {
typedef struct SStreamOptions { typedef struct SStreamOptions {
ENodeType type; ENodeType type;
int8_t triggerType; int8_t triggerType;
SNode* pDelay;
SNode* pWatermark; SNode* pWatermark;
} SStreamOptions; } SStreamOptions;
......
...@@ -252,22 +252,20 @@ typedef struct SNodeList { ...@@ -252,22 +252,20 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
#define SNodeptr void* SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
SNodeptr nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNodeptr pNode);
SNodeList* nodesMakeList(); SNodeList* nodesMakeList();
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode); int32_t nodesListAppend(SNodeList* pList, SNode* pNode);
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode); int32_t nodesListStrictAppend(SNodeList* pList, SNode* pNode);
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode); int32_t nodesListMakeAppend(SNodeList** pList, SNode* pNode);
int32_t nodesListMakeStrictAppend(SNodeList** pList, SNodeptr pNode); int32_t nodesListMakeStrictAppend(SNodeList** pList, SNode* pNode);
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode); int32_t nodesListPushFront(SNodeList* pList, SNode* pNode);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc); void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc);
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index); SNode* nodesListGetNode(SNodeList* pList, int32_t index);
void nodesDestroyList(SNodeList* pList); void nodesDestroyList(SNodeList* pList);
// Only clear the linked list structure, without releasing the elements inside // Only clear the linked list structure, without releasing the elements inside
void nodesClearList(SNodeList* pList); void nodesClearList(SNodeList* pList);
...@@ -275,9 +273,9 @@ void nodesClearList(SNodeList* pList); ...@@ -275,9 +273,9 @@ void nodesClearList(SNodeList* pList);
typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, DEAL_RES_END } EDealRes; typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, DEAL_RES_END } EDealRes;
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext); void nodesWalkExpr(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkExprs(SNodeList* pList, FNodeWalker walker, void* pContext); void nodesWalkExprs(SNodeList* pList, FNodeWalker walker, void* pContext);
void nodesWalkExprPostOrder(SNodeptr pNode, FNodeWalker walker, void* pContext); void nodesWalkExprPostOrder(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext); void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext);
typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext); typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext);
...@@ -286,13 +284,13 @@ void nodesRewriteExprs(SNodeList* pList, FNodeRewriter rewriter, void* pContext) ...@@ -286,13 +284,13 @@ void nodesRewriteExprs(SNodeList* pList, FNodeRewriter rewriter, void* pContext)
void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext); void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext);
void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext); void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext);
bool nodesEqualNode(const SNodeptr a, const SNodeptr b); bool nodesEqualNode(const SNode* a, const SNode* b);
SNodeptr nodesCloneNode(const SNodeptr pNode); SNode* nodesCloneNode(const SNode* pNode);
SNodeList* nodesCloneList(const SNodeList* pList); SNodeList* nodesCloneList(const SNodeList* pList);
const char* nodesNodeName(ENodeType type); const char* nodesNodeName(ENodeType type);
int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_t* pLen); int32_t nodesNodeToString(const SNode* pNode, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode); int32_t nodesStringToNode(const char* pStr, SNode** pNode);
int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen);
......
...@@ -336,22 +336,21 @@ typedef enum EQueryExecMode { ...@@ -336,22 +336,21 @@ typedef enum EQueryExecMode {
} EQueryExecMode; } EQueryExecMode;
typedef struct SQuery { typedef struct SQuery {
ENodeType type; ENodeType type;
EQueryExecMode execMode; EQueryExecMode execMode;
bool haveResultSet; bool haveResultSet;
SNode* pRoot; SNode* pRoot;
int32_t numOfResCols; int32_t numOfResCols;
SSchema* pResSchema; SSchema* pResSchema;
int8_t precision; int8_t precision;
SCmdMsgInfo* pCmdMsg; SCmdMsgInfo* pCmdMsg;
int32_t msgType; int32_t msgType;
SArray* pTableList; SArray* pTableList;
SArray* pDbList; SArray* pDbList;
bool showRewrite; bool showRewrite;
int32_t placeholderNum; int32_t placeholderNum;
SArray* pPlaceholderValues; SArray* pPlaceholderValues;
SNode* pPrepareRoot; SNode* pPrepareRoot;
struct SParseMetaCache* pMetaCache;
} SQuery; } SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
...@@ -698,7 +698,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { ...@@ -698,7 +698,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
return NULL; return NULL;
} }
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq); // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
......
...@@ -233,9 +233,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -233,9 +233,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
return pRequest->pTscObj->pAppInfo;
}
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL; SRetrieveTableRsp* pRsp = NULL;
...@@ -259,7 +257,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -259,7 +257,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
} }
pRequest->body.queryFp(pRequest->body.param, pRequest, 0); pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); // pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
} }
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
...@@ -323,7 +321,7 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) { ...@@ -323,7 +321,7 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
...@@ -401,21 +399,19 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { ...@@ -401,21 +399,19 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
tsem_init(&schdRspSem, 0, 0); tsem_init(&schdRspSem, 0, 0);
SQueryResult res = {.code = 0, .numOfRows = 0}; SQueryResult res = {.code = 0, .numOfRows = 0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self .requestObjRefId = pRequest->self};
}; SSchedulerReq req = {.pConn = &conn,
SSchedulerReq req = {.pConn = &conn, .pNodeList = pNodeList,
.pNodeList = pNodeList, .pDag = pDag,
.pDag = pDag, .sql = pRequest->sqlstr,
.sql = pRequest->sqlstr, .startTs = pRequest->metric.start,
.startTs = pRequest->metric.start, .fp = schdExecCallback,
.fp = schdExecCallback, .cbParam = &res};
.cbParam = &res
}; int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
pRequest->body.resInfo.execRes = res.res; pRequest->body.resInfo.execRes = res.res;
...@@ -455,21 +451,19 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod ...@@ -455,21 +451,19 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {0}; SQueryResult res = {0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self .requestObjRefId = pRequest->self};
}; SSchedulerReq req = {.pConn = &conn,
SSchedulerReq req = {.pConn = &conn, .pNodeList = pNodeList,
.pNodeList = pNodeList, .pDag = pDag,
.pDag = pDag, .sql = pRequest->sqlstr,
.sql = pRequest->sqlstr, .startTs = pRequest->metric.start,
.startTs = pRequest->metric.start, .fp = NULL,
.fp = NULL, .cbParam = NULL};
.cbParam = NULL
}; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
int32_t code = schedulerExecJob(&req,&pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res; pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -482,7 +476,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -482,7 +476,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows; pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
...@@ -495,9 +490,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -495,9 +490,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0; int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
SSubmitRsp* pRsp = (SSubmitRsp*)res; SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) { if (pRsp->nBlocks <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -519,7 +514,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog ...@@ -519,7 +514,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = *epset}; .mgmtEps = *epset};
...@@ -532,7 +527,7 @@ _return: ...@@ -532,7 +527,7 @@ _return:
return code; return code;
} }
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0; int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
SArray* pTbArray = (SArray*)res; SArray* pTbArray = (SArray*)res;
...@@ -553,7 +548,7 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, ...@@ -553,7 +548,7 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog,
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = *epset}; .mgmtEps = *epset};
...@@ -575,15 +570,15 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -575,15 +570,15 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
SAppInstInfo* pAppInfo = getAppInfo(pRequest); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog); int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
if (code) { if (code) {
return code; return code;
} }
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) { switch (pRes->msgType) {
...@@ -601,8 +596,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -601,8 +596,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
break; break;
} }
default: default:
tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self, tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self,
pRequest->type, pRequest->requestId); pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
} }
...@@ -610,13 +605,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -610,13 +605,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
} }
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code; pRequest->code = code;
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
...@@ -624,7 +619,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { ...@@ -624,7 +619,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest); code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS); ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code; pRequest->code = code;
} }
...@@ -727,35 +722,34 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -727,35 +722,34 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
if (code) { if (code) {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SRequestConnInfo conn = {.pTrans = pAppInfo->pTransporter, SRequestConnInfo conn = {
.requestId = pRequest->requestId, .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
.requestObjRefId = pRequest->self
};
SSchedulerReq req = {.pConn = &conn, SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pRequest->body.pDag, .pDag = pRequest->body.pDag,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = schedulerExecCb, .fp = schedulerExecCb,
.cbParam = pRequest .cbParam = pRequest};
};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
} else { } else {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
//todo not to be released here // todo not to be released here
taosArrayDestroy(pNodeList); taosArrayDestroy(pNodeList);
break; break;
} }
case QUERY_EXEC_MODE_EMPTY_RESULT: case QUERY_EXEC_MODE_EMPTY_RESULT:
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
pRequest->body.queryFp(pRequest->body.param, pRequest, 0); pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
break; break;
default: default:
...@@ -786,7 +780,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { ...@@ -786,7 +780,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code; return code;
} }
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
...@@ -989,23 +983,22 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -989,23 +983,22 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
switch (pSendInfo->target.type) { switch (pSendInfo->target.type) {
case TARGET_TYPE_MNODE: case TARGET_TYPE_MNODE:
if (NULL == pTscObj) { if (NULL == pTscObj) {
tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64, tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId); TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
return; return;
} }
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pEpSet->eps[pEpSet->inUse]; SEp* pNewEp = &pEpSet->eps[pEpSet->inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
break; break;
case TARGET_TYPE_VNODE: { case TARGET_TYPE_VNODE: {
if (NULL == pTscObj) { if (NULL == pTscObj) {
tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64, tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId); TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
return; return;
} }
...@@ -1415,14 +1408,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -1415,14 +1408,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
p += sizeof(uint64_t); p += sizeof(uint64_t);
// check fields // check fields
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int16_t type = *(int16_t*) p; int16_t type = *(int16_t*)p;
p += sizeof(int16_t); p += sizeof(int16_t);
int32_t bytes = *(int32_t*) p; int32_t bytes = *(int32_t*)p;
p += sizeof(int32_t); p += sizeof(int32_t);
// ASSERT(type == pFields[i].type && bytes == pFields[i].bytes); // ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);
} }
int32_t* colLength = (int32_t*)p; int32_t* colLength = (int32_t*)p;
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE; static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt); static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt);
int taos_options(TSDB_OPTION option, const void *arg, ...) { int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0; static int32_t lock = 0;
...@@ -177,8 +177,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -177,8 +177,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields; return pResInfo->userFields;
} }
static void syncQueryFn(void* param, void* res, int32_t code) { static void syncQueryFn(void *param, void *res, int32_t code) {
SSyncQueryParam* pParam = param; SSyncQueryParam *pParam = param;
pParam->pRequest = res; pParam->pRequest = res;
pParam->pRequest->code = code; pParam->pRequest->code = code;
...@@ -190,10 +190,10 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -190,10 +190,10 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return NULL; return NULL;
} }
STscObj* pTscObj = (STscObj*)taos; STscObj *pTscObj = (STscObj *)taos;
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param); taos_query_a(pTscObj, sql, syncQueryFn, param);
...@@ -606,16 +606,30 @@ const char *taos_get_server_info(TAOS *taos) { ...@@ -606,16 +606,30 @@ const char *taos_get_server_info(TAOS *taos) {
} }
typedef struct SqlParseWrapper { typedef struct SqlParseWrapper {
SParseContext* pCtx; SParseContext *pCtx;
SCatalogReq catalogReq; SCatalogReq catalogReq;
SRequestObj* pRequest; SRequestObj *pRequest;
SQuery* pQuery; SQuery *pQuery;
} SqlParseWrapper; } SqlParseWrapper;
void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
SqlParseWrapper *pWrapper = (SqlParseWrapper*) param; taosArrayDestroy(pWrapper->catalogReq.pDbVgroup);
SQuery* pQuery = pWrapper->pQuery; taosArrayDestroy(pWrapper->catalogReq.pDbCfg);
SRequestObj* pRequest = pWrapper->pRequest; taosArrayDestroy(pWrapper->catalogReq.pDbInfo);
taosArrayDestroy(pWrapper->catalogReq.pTableMeta);
taosArrayDestroy(pWrapper->catalogReq.pTableHash);
taosArrayDestroy(pWrapper->catalogReq.pUdf);
taosArrayDestroy(pWrapper->catalogReq.pIndex);
taosArrayDestroy(pWrapper->catalogReq.pUser);
taosArrayDestroy(pWrapper->catalogReq.pTableIndex);
taosMemoryFree(pWrapper->pCtx);
taosMemoryFree(pWrapper);
}
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
...@@ -630,22 +644,22 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { ...@@ -630,22 +644,22 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
TSWAP(pRequest->dbList, (pQuery)->pDbList); TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList); TSWAP(pRequest->tableList, (pQuery)->pTableList);
taosMemoryFree(pWrapper); destorySqlParseWrapper(pWrapper);
launchAsyncQuery(pRequest, pQuery); launchAsyncQuery(pRequest, pQuery);
} else { } else {
destorySqlParseWrapper(pWrapper);
tscDebug("error happens, code:%d", code); tscDebug("error happens, code:%d", code);
if (NEED_CLIENT_HANDLE_ERROR(code)) { if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
} }
// return to app directly // return to app directly
taosMemoryFree(pWrapper); tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self,
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), tstrerror(code), pRequest->requestId);
pRequest->requestId);
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
...@@ -670,7 +684,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param ...@@ -670,7 +684,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
} }
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest); int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
fp(param, NULL, terrno); fp(param, NULL, terrno);
...@@ -678,11 +692,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param ...@@ -678,11 +692,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
} }
pRequest->body.queryFp = fp; pRequest->body.queryFp = fp;
pRequest->body.param = param; pRequest->body.param = param;
doAsyncQuery(pRequest, false); doAsyncQuery(pRequest, false);
} }
int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
const STscObj *pTscObj = pRequest->pTscObj; const STscObj *pTscObj = pRequest->pTscObj;
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
...@@ -690,28 +704,30 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { ...@@ -690,28 +704,30 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
**pCxt = (SParseContext){.requestId = pRequest->requestId, **pCxt = (SParseContext){
.requestRid = pRequest->self, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId, .requestRid = pRequest->self,
.db = pRequest->pDb, .acctId = pTscObj->acctId,
.topicQuery = false, .db = pRequest->pDb,
.pSql = pRequest->sqlstr, .topicQuery = false,
.sqlLen = pRequest->sqlLen, .pSql = pRequest->sqlstr,
.pMsg = pRequest->msgBuf, .sqlLen = pRequest->sqlLen,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .pMsg = pRequest->msgBuf,
.pTransporter = pTscObj->pAppInfo->pTransporter, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pStmtCb = NULL, .pTransporter = pTscObj->pAppInfo->pTransporter,
.pUser = pTscObj->user, .pStmtCb = NULL,
.schemalessType = pTscObj->schemalessType, .pUser = pTscObj->user,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .schemalessType = pTscObj->schemalessType,
.async = true,}; .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,
};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext* pCxt = NULL; SParseContext *pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0; int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
code = pRequest->prevCode; code = pRequest->prevCode;
...@@ -748,28 +764,29 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { ...@@ -748,28 +764,29 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
pWrapper->pRequest = pRequest; pWrapper->pRequest = pRequest;
pWrapper->catalogReq = catalogReq; pWrapper->catalogReq = catalogReq;
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
.requestId = pCxt->requestId, .requestId = pCxt->requestId,
.requestObjRefId = pCxt->requestRid, .requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet}; .mgmtEps = pCxt->mgmtEpSet};
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, &catalogReq, retrieveMetaCallback, pWrapper,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); &pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return; return;
} }
_error: _error:
tscError("0x%"PRIx64" error happens, code:%d - %s, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
static void fetchCallback(void* pResult, void* param, int32_t code) { static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->pData = pResult; pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
...@@ -785,7 +802,8 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { ...@@ -785,7 +802,8 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
return; return;
} }
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false); pRequest->code =
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->code = code; pRequest->code = code;
...@@ -801,7 +819,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { ...@@ -801,7 +819,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT (res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp; pRequest->body.fetchFp = fp;
...@@ -825,7 +843,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -825,7 +843,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
} }
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
...@@ -838,9 +856,9 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { ...@@ -838,9 +856,9 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
taos_fetch_rows_a(res, fp, param); taos_fetch_rows_a(res, fp, param);
} }
const void* taos_get_raw_block(TAOS_RES* res) { const void *taos_get_raw_block(TAOS_RES *res) {
ASSERT(res != NULL); ASSERT(res != NULL);
SRequestObj* pRequest = res; SRequestObj *pRequest = res;
return pRequest->body.resInfo.pData; return pRequest->body.resInfo.pData;
} }
...@@ -924,26 +942,25 @@ int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) { ...@@ -924,26 +942,25 @@ int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
return stmtSetTbTags(stmt, tags); return stmtSetTbTags(stmt, tags);
} }
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); } int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) { int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) { if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
return stmtGetTagFields(stmt, fieldNum, fields); return stmtGetTagFields(stmt, fieldNum, fields);
} }
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) { int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) { if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
return stmtGetColFields(stmt, fieldNum, fields); return stmtGetColFields(stmt, fieldNum, fields);
} }
...@@ -1102,4 +1119,3 @@ int taos_stmt_close(TAOS_STMT *stmt) { ...@@ -1102,4 +1119,3 @@ int taos_stmt_close(TAOS_STMT *stmt) {
return stmtClose(stmt); return stmtClose(stmt);
} }
此差异已折叠。
...@@ -692,6 +692,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq ...@@ -692,6 +692,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
void tFreeSMAltertbReq(SMAlterStbReq *pReq) { void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
taosArrayDestroy(pReq->pFields); taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL; pReq->pFields = NULL;
taosMemoryFreeClear(pReq->comment);
} }
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) { int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
...@@ -4064,6 +4065,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -4064,6 +4065,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1; if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
if (tEncodeI64(&encoder, pReq->maxDelay) < 0) return -1;
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1; if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
...@@ -4090,6 +4092,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -4090,6 +4092,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1; if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->maxDelay) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1; if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1;
if (sqlLen > 0) { if (sqlLen > 0) {
......
...@@ -77,7 +77,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 ...@@ -77,7 +77,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END; goto END;
} }
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList); int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) { if (opNum != 1) {
...@@ -85,7 +85,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 ...@@ -85,7 +85,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
goto END; goto END;
} }
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
if (qSubPlanToString(plan, pDst, pDstLen) < 0) { if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END; goto END;
...@@ -93,7 +93,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 ...@@ -93,7 +93,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
END: END:
if (pAst) nodesDestroyNode(pAst); if (pAst) nodesDestroyNode(pAst);
if (pPlan) nodesDestroyNode(pPlan); if (pPlan) nodesDestroyNode((SNode*)pPlan);
return terrno; return terrno;
} }
...@@ -378,8 +378,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -378,8 +378,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskInnerLevel); taosArrayPush(pStream->tasks, &taskInnerLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
pFinalTask = tNewSStreamTask(pStream->uid); pFinalTask = tNewSStreamTask(pStream->uid);
...@@ -407,8 +407,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -407,8 +407,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskSourceLevel); taosArrayPush(pStream->tasks, &taskSourceLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 1); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL; void* pIter = NULL;
...@@ -449,9 +449,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -449,9 +449,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
ASSERT(LIST_LENGTH(inner->pNodeList) == 1); ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL; void* pIter = NULL;
...@@ -509,7 +509,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -509,7 +509,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1; return -1;
} }
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList); int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) { if (opNum != 1) {
...@@ -517,7 +517,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -517,7 +517,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY; terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
return -1; return -1;
} }
plan = nodesListGetNode(inner->pNodeList, 0); plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
} }
ASSERT(pSub->unassignedVgs); ASSERT(pSub->unassignedVgs);
......
...@@ -235,10 +235,10 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 ...@@ -235,10 +235,10 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pPlan, false, pStr, NULL); code = nodesNodeToString((SNode*)pPlan, false, pStr, NULL);
} }
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyNode(pPlan); nodesDestroyNode((SNode*)pPlan);
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -387,7 +387,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -387,7 +387,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
return -1; return -1;
} }
if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) { if (nodesNodeToString((SNode*)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
......
...@@ -211,7 +211,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -211,7 +211,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
SExplainResNode *pResNode = NULL; SExplainResNode *pResNode = NULL;
FOREACH(node, pPhysiChildren) { FOREACH(node, pPhysiChildren) {
QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode)); QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode));
QRY_ERR_RET(nodesListAppend(*pChildren, pResNode)); QRY_ERR_RET(nodesListAppend(*pChildren, (SNode*)pResNode));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -715,7 +715,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -715,7 +715,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, "Sort Key: "); EXPLAIN_ROW_NEW(level + 1, "Sort Key: ");
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pSortNode->pSortKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pSortNode->pSortKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
} }
...@@ -1039,7 +1039,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1039,7 +1039,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, "Merge Key: "); EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
} }
...@@ -1078,7 +1078,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1078,7 +1078,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
......
...@@ -202,7 +202,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -202,7 +202,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
// if (!pDescNode->output) { // todo disable it temporarily // if (!pDescNode->output) { // todo disable it temporarily
// continue; // continue;
// } // }
...@@ -2920,7 +2920,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* ...@@ -2920,7 +2920,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
} }
for (int32_t i = 0; i < numOfSources; ++i) { for (int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); SNodeListNode* pNode = (SNodeListNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode); taosArrayPush(pInfo->pSources, pNode);
} }
...@@ -4494,7 +4494,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -4494,7 +4494,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
if (NULL == res) { // todo handle error if (NULL == res) { // todo handle error
} else { } else {
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
nodesListAppend(pFuncNode->pParameterList, res); nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
} }
} }
#endif #endif
......
...@@ -1378,7 +1378,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1378,7 +1378,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChildOp); taosMemoryFreeClear(pChildOp);
} }
} }
nodesDestroyNode(pInfo->pPhyNode); nodesDestroyNode((SNode*)pInfo->pPhyNode);
} }
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
...@@ -2157,7 +2157,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2157,7 +2157,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS; pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->pPhyNode = nodesCloneNode(pPhyNode); pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
pOperator->name = "StreamFinalIntervalOperator"; pOperator->name = "StreamFinalIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
......
...@@ -42,6 +42,7 @@ extern "C" { ...@@ -42,6 +42,7 @@ extern "C" {
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13) #define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14) #define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15) #define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......
...@@ -1274,7 +1274,7 @@ void static addTimezoneParam(SNodeList* pList) { ...@@ -1274,7 +1274,7 @@ void static addTimezoneParam(SNodeList* pList) {
varDataSetLen(pVal->datum.p, len); varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len); strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, pVal); nodesListAppend(pList, (SNode*)pVal);
} }
static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
...@@ -1363,7 +1363,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -1363,7 +1363,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
SExprNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); SExprNode* pPara = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_VALUE != nodeType(pPara) || (!IS_VAR_DATA_TYPE(pPara->resType.type))) { if (QUERY_NODE_VALUE != nodeType(pPara) || (!IS_VAR_DATA_TYPE(pPara->resType.type))) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
......
...@@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; } ...@@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); } bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); }
bool fmIsForbidStreamFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC); }
void fmFuncMgtDestroy() { void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable; void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
...@@ -239,7 +241,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) { ...@@ -239,7 +241,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) {
} }
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) { if (NULL == pFunc) {
return NULL; return NULL;
} }
...@@ -247,14 +249,14 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis ...@@ -247,14 +249,14 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
pFunc->pParameterList = pParameterList; pFunc->pParameterList = pParameterList;
if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) { if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
pFunc->pParameterList = NULL; pFunc->pParameterList = NULL;
nodesDestroyNode(pFunc); nodesDestroyNode((SNode*)pFunc);
return NULL; return NULL;
} }
return pFunc; return pFunc;
} }
static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
return NULL; return NULL;
} }
...@@ -291,7 +293,7 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod ...@@ -291,7 +293,7 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SFunctionNode** pMergeFunc) { SFunctionNode** pMergeFunc) {
SNodeList* pParameterList = NULL; SNodeList* pParameterList = NULL;
nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc)); nodesListMakeStrictAppend(&pParameterList, (SNode*)createColumnByFunc(pPartialFunc));
*pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); *pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
if (NULL == *pMergeFunc) { if (NULL == *pMergeFunc) {
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
...@@ -316,8 +318,8 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc ...@@ -316,8 +318,8 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(*pPartialFunc); nodesDestroyNode((SNode*)*pPartialFunc);
nodesDestroyNode(*pMergeFunc); nodesDestroyNode((SNode*)*pMergeFunc);
} }
return code; return code;
......
...@@ -40,6 +40,10 @@ ...@@ -40,6 +40,10 @@
break; \ break; \
} \ } \
(pDst)->fldname = strdup((pSrc)->fldname); \ (pDst)->fldname = strdup((pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)pDst); \
return NULL; \
} \
} while (0) } while (0)
#define CLONE_NODE_FIELD(fldname) \ #define CLONE_NODE_FIELD(fldname) \
...@@ -49,11 +53,23 @@ ...@@ -49,11 +53,23 @@
} \ } \
(pDst)->fldname = nodesCloneNode((pSrc)->fldname); \ (pDst)->fldname = nodesCloneNode((pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \ if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)(pDst)); \ nodesDestroyNode((SNode*)pDst); \
return NULL; \ return NULL; \
} \ } \
} while (0) } while (0)
#define CLONE_NODE_FIELD_EX(fldname, nodePtrType) \
do { \
if (NULL == (pSrc)->fldname) { \
break; \
} \
(pDst)->fldname = (nodePtrType)nodesCloneNode((SNode*)(pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)pDst); \
return NULL; \
} \
} while (0)
#define CLONE_NODE_LIST_FIELD(fldname) \ #define CLONE_NODE_LIST_FIELD(fldname) \
do { \ do { \
if (NULL == (pSrc)->fldname) { \ if (NULL == (pSrc)->fldname) { \
...@@ -61,7 +77,7 @@ ...@@ -61,7 +77,7 @@
} \ } \
(pDst)->fldname = nodesCloneList((pSrc)->fldname); \ (pDst)->fldname = nodesCloneList((pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \ if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)(pDst)); \ nodesDestroyNode((SNode*)pDst); \
return NULL; \ return NULL; \
} \ } \
} while (0) } while (0)
...@@ -73,7 +89,7 @@ ...@@ -73,7 +89,7 @@
} \ } \
(pDst)->fldname = cloneFunc((pSrc)->fldname); \ (pDst)->fldname = cloneFunc((pSrc)->fldname); \
if (NULL == (pDst)->fldname) { \ if (NULL == (pDst)->fldname) { \
nodesDestroyNode((SNode*)(pDst)); \ nodesDestroyNode((SNode*)pDst); \
return NULL; \ return NULL; \
} \ } \
} while (0) } while (0)
...@@ -81,6 +97,7 @@ ...@@ -81,6 +97,7 @@
#define COPY_BASE_OBJECT_FIELD(fldname, copyFunc) \ #define COPY_BASE_OBJECT_FIELD(fldname, copyFunc) \
do { \ do { \
if (NULL == copyFunc(&((pSrc)->fldname), &((pDst)->fldname))) { \ if (NULL == copyFunc(&((pSrc)->fldname), &((pDst)->fldname))) { \
nodesDestroyNode((SNode*)pDst); \
return NULL; \ return NULL; \
} \ } \
} while (0) } while (0)
...@@ -147,7 +164,7 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { ...@@ -147,7 +164,7 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
int32_t len = varDataTLen(pSrc->datum.p) + 1; int32_t len = varDataTLen(pSrc->datum.p) + 1;
pDst->datum.p = taosMemoryCalloc(1, len); pDst->datum.p = taosMemoryCalloc(1, len);
if (NULL == pDst->datum.p) { if (NULL == pDst->datum.p) {
nodesDestroyNode(pDst); nodesDestroyNode((SNode*)pDst);
return NULL; return NULL;
} }
memcpy(pDst->datum.p, pSrc->datum.p, len); memcpy(pDst->datum.p, pSrc->datum.p, len);
...@@ -275,8 +292,8 @@ static SNode* stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode ...@@ -275,8 +292,8 @@ static SNode* stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode
} }
static SNode* sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) { static SNode* sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD(pCol); CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD(pGap); CLONE_NODE_FIELD_EX(pGap, SValueNode*);
return (SNode*)pDst; return (SNode*)pDst;
} }
...@@ -442,7 +459,7 @@ static SNode* logicIndefRowsFuncCopy(const SIndefRowsFuncLogicNode* pSrc, SIndef ...@@ -442,7 +459,7 @@ static SNode* logicIndefRowsFuncCopy(const SIndefRowsFuncLogicNode* pSrc, SIndef
static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
COPY_OBJECT_FIELD(id, sizeof(SSubplanId)); COPY_OBJECT_FIELD(id, sizeof(SSubplanId));
CLONE_NODE_FIELD(pNode); CLONE_NODE_FIELD_EX(pNode, SLogicNode*);
COPY_SCALAR_FIELD(subplanType); COPY_SCALAR_FIELD(subplanType);
COPY_SCALAR_FIELD(level); COPY_SCALAR_FIELD(level);
COPY_SCALAR_FIELD(splitFlag); COPY_SCALAR_FIELD(splitFlag);
...@@ -450,7 +467,7 @@ static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { ...@@ -450,7 +467,7 @@ static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
} }
static SNode* physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) { static SNode* physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
CLONE_NODE_FIELD(pOutputDataBlockDesc); CLONE_NODE_FIELD_EX(pOutputDataBlockDesc, SDataBlockDescNode*);
CLONE_NODE_FIELD(pConditions); CLONE_NODE_FIELD(pConditions);
CLONE_NODE_LIST_FIELD(pChildren); CLONE_NODE_LIST_FIELD(pChildren);
return (SNode*)pDst; return (SNode*)pDst;
...@@ -555,8 +572,8 @@ static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) { ...@@ -555,8 +572,8 @@ static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
CLONE_NODE_LIST_FIELD(pGroupByList); CLONE_NODE_LIST_FIELD(pGroupByList);
CLONE_NODE_FIELD(pHaving); CLONE_NODE_FIELD(pHaving);
CLONE_NODE_LIST_FIELD(pOrderByList); CLONE_NODE_LIST_FIELD(pOrderByList);
CLONE_NODE_FIELD(pLimit); CLONE_NODE_FIELD_EX(pLimit, SLimitNode*);
CLONE_NODE_FIELD(pLimit); CLONE_NODE_FIELD_EX(pLimit, SLimitNode*);
COPY_CHAR_ARRAY_FIELD(stmtName); COPY_CHAR_ARRAY_FIELD(stmtName);
COPY_SCALAR_FIELD(precision); COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(isEmptyResult); COPY_SCALAR_FIELD(isEmptyResult);
...@@ -566,7 +583,7 @@ static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) { ...@@ -566,7 +583,7 @@ static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
return (SNode*)pDst; return (SNode*)pDst;
} }
SNodeptr nodesCloneNode(const SNodeptr pNode) { SNode* nodesCloneNode(const SNode* pNode) {
if (NULL == pNode) { if (NULL == pNode) {
return NULL; return NULL;
} }
......
...@@ -4205,7 +4205,7 @@ static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** p ...@@ -4205,7 +4205,7 @@ static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** p
return makeNodeByJson(pJsonNode, pNode); return makeNodeByJson(pJsonNode, pNode);
} }
int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_t* pLen) { int32_t nodesNodeToString(const SNode* pNode, bool format, char** pStr, int32_t* pLen) {
if (NULL == pNode || NULL == pStr) { if (NULL == pNode || NULL == pStr) {
terrno = TSDB_CODE_FAILED; terrno = TSDB_CODE_FAILED;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -137,7 +137,7 @@ static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) { ...@@ -137,7 +137,7 @@ static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
return true; return true;
} }
bool nodesEqualNode(const SNodeptr a, const SNodeptr b) { bool nodesEqualNode(const SNode* a, const SNode* b) {
if (a == b) { if (a == b) {
return true; return true;
} }
......
...@@ -168,7 +168,7 @@ static EDealRes walkExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeWalk ...@@ -168,7 +168,7 @@ static EDealRes walkExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeWalk
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext) { void nodesWalkExpr(SNode* pNode, FNodeWalker walker, void* pContext) {
(void)walkExpr(pNode, TRAVERSAL_PREORDER, walker, pContext); (void)walkExpr(pNode, TRAVERSAL_PREORDER, walker, pContext);
} }
...@@ -176,7 +176,7 @@ void nodesWalkExprs(SNodeList* pNodeList, FNodeWalker walker, void* pContext) { ...@@ -176,7 +176,7 @@ void nodesWalkExprs(SNodeList* pNodeList, FNodeWalker walker, void* pContext) {
(void)walkExprs(pNodeList, TRAVERSAL_PREORDER, walker, pContext); (void)walkExprs(pNodeList, TRAVERSAL_PREORDER, walker, pContext);
} }
void nodesWalkExprPostOrder(SNodeptr pNode, FNodeWalker walker, void* pContext) { void nodesWalkExprPostOrder(SNode* pNode, FNodeWalker walker, void* pContext) {
(void)walkExpr(pNode, TRAVERSAL_POSTORDER, walker, pContext); (void)walkExpr(pNode, TRAVERSAL_POSTORDER, walker, pContext);
} }
......
...@@ -30,7 +30,7 @@ static SNode* makeNode(ENodeType type, size_t size) { ...@@ -30,7 +30,7 @@ static SNode* makeNode(ENodeType type, size_t size) {
return p; return p;
} }
SNodeptr nodesMakeNode(ENodeType type) { SNode* nodesMakeNode(ENodeType type) {
switch (type) { switch (type) {
case QUERY_NODE_COLUMN: case QUERY_NODE_COLUMN:
return makeNode(type, sizeof(SColumnNode)); return makeNode(type, sizeof(SColumnNode));
...@@ -215,6 +215,8 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -215,6 +215,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SKillStmt)); return makeNode(type, sizeof(SKillStmt));
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return makeNode(type, sizeof(SDeleteStmt)); return makeNode(type, sizeof(SDeleteStmt));
case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery));
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode)); return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
...@@ -325,7 +327,7 @@ static void destroyLogicNode(SLogicNode* pNode) { ...@@ -325,7 +327,7 @@ static void destroyLogicNode(SLogicNode* pNode) {
static void destroyPhysiNode(SPhysiNode* pNode) { static void destroyPhysiNode(SPhysiNode* pNode) {
nodesDestroyList(pNode->pChildren); nodesDestroyList(pNode->pChildren);
nodesDestroyNode(pNode->pConditions); nodesDestroyNode(pNode->pConditions);
nodesDestroyNode(pNode->pOutputDataBlockDesc); nodesDestroyNode((SNode*)pNode->pOutputDataBlockDesc);
} }
static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) { static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
...@@ -340,9 +342,9 @@ static void destroyScanPhysiNode(SScanPhysiNode* pNode) { ...@@ -340,9 +342,9 @@ static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
nodesDestroyList(pNode->pScanCols); nodesDestroyList(pNode->pScanCols);
} }
static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode(pNode->pInputDataBlockDesc); } static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode*)pNode->pInputDataBlockDesc); }
void nodesDestroyNode(SNodeptr pNode) { void nodesDestroyNode(SNode* pNode) {
if (NULL == pNode) { if (NULL == pNode) {
return; return;
} }
...@@ -399,8 +401,8 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -399,8 +401,8 @@ void nodesDestroyNode(SNodeptr pNode) {
break; break;
case QUERY_NODE_SESSION_WINDOW: { case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
nodesDestroyNode(pSession->pCol); nodesDestroyNode((SNode*)pSession->pCol);
nodesDestroyNode(pSession->pGap); nodesDestroyNode((SNode*)pSession->pGap);
break; break;
} }
case QUERY_NODE_INTERVAL_WINDOW: { case QUERY_NODE_INTERVAL_WINDOW: {
...@@ -436,7 +438,7 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -436,7 +438,7 @@ void nodesDestroyNode(SNodeptr pNode) {
break; break;
case QUERY_NODE_DATABASE_OPTIONS: { case QUERY_NODE_DATABASE_OPTIONS: {
SDatabaseOptions* pOptions = (SDatabaseOptions*)pNode; SDatabaseOptions* pOptions = (SDatabaseOptions*)pNode;
nodesDestroyNode(pOptions->pDaysPerFile); nodesDestroyNode((SNode*)pOptions->pDaysPerFile);
nodesDestroyList(pOptions->pKeep); nodesDestroyList(pOptions->pKeep);
nodesDestroyList(pOptions->pRetentions); nodesDestroyList(pOptions->pRetentions);
break; break;
...@@ -455,6 +457,13 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -455,6 +457,13 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyNode(pOptions->pSliding); nodesDestroyNode(pOptions->pSliding);
break; break;
} }
case QUERY_NODE_EXPLAIN_OPTIONS: // no pointer field
break;
case QUERY_NODE_STREAM_OPTIONS:
nodesDestroyNode(((SStreamOptions*)pNode)->pWatermark);
break;
case QUERY_NODE_LEFT_VALUE: // no pointer field
break;
case QUERY_NODE_SET_OPERATOR: { case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode; SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyNode(pStmt->pLeft); nodesDestroyNode(pStmt->pLeft);
...@@ -473,26 +482,26 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -473,26 +482,26 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(pStmt->pGroupByList); nodesDestroyList(pStmt->pGroupByList);
nodesDestroyNode(pStmt->pHaving); nodesDestroyNode(pStmt->pHaving);
nodesDestroyList(pStmt->pOrderByList); nodesDestroyList(pStmt->pOrderByList);
nodesDestroyNode(pStmt->pLimit); nodesDestroyNode((SNode*)pStmt->pLimit);
nodesDestroyNode(pStmt->pSlimit); nodesDestroyNode((SNode*)pStmt->pSlimit);
break; break;
} }
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
destroyVgDataBlockArray(((SVnodeModifOpStmt*)pNode)->pDataBlocks); destroyVgDataBlockArray(((SVnodeModifOpStmt*)pNode)->pDataBlocks);
break; break;
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT:
nodesDestroyNode(((SCreateDatabaseStmt*)pNode)->pOptions); nodesDestroyNode((SNode*)((SCreateDatabaseStmt*)pNode)->pOptions);
break; break;
case QUERY_NODE_DROP_DATABASE_STMT: // no pointer field case QUERY_NODE_DROP_DATABASE_STMT: // no pointer field
break; break;
case QUERY_NODE_ALTER_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT:
nodesDestroyNode(((SAlterDatabaseStmt*)pNode)->pOptions); nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions);
break; break;
case QUERY_NODE_CREATE_TABLE_STMT: { case QUERY_NODE_CREATE_TABLE_STMT: {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode; SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode;
nodesDestroyList(pStmt->pCols); nodesDestroyList(pStmt->pCols);
nodesDestroyList(pStmt->pTags); nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pOptions); nodesDestroyNode((SNode*)pStmt->pOptions);
break; break;
} }
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE: { case QUERY_NODE_CREATE_SUBTABLE_CLAUSE: {
...@@ -507,14 +516,14 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -507,14 +516,14 @@ void nodesDestroyNode(SNodeptr pNode) {
case QUERY_NODE_DROP_TABLE_CLAUSE: // no pointer field case QUERY_NODE_DROP_TABLE_CLAUSE: // no pointer field
break; break;
case QUERY_NODE_DROP_TABLE_STMT: case QUERY_NODE_DROP_TABLE_STMT:
nodesDestroyNode(((SDropTableStmt*)pNode)->pTables); nodesDestroyList(((SDropTableStmt*)pNode)->pTables);
break; break;
case QUERY_NODE_DROP_SUPER_TABLE_STMT: // no pointer field case QUERY_NODE_DROP_SUPER_TABLE_STMT: // no pointer field
break; break;
case QUERY_NODE_ALTER_TABLE_STMT: { case QUERY_NODE_ALTER_TABLE_STMT: {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pNode; SAlterTableStmt* pStmt = (SAlterTableStmt*)pNode;
nodesDestroyNode(pStmt->pOptions); nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyNode(pStmt->pVal); nodesDestroyNode((SNode*)pStmt->pVal);
break; break;
} }
case QUERY_NODE_CREATE_USER_STMT: // no pointer field case QUERY_NODE_CREATE_USER_STMT: // no pointer field
...@@ -527,37 +536,107 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -527,37 +536,107 @@ void nodesDestroyNode(SNodeptr pNode) {
break; break;
case QUERY_NODE_CREATE_INDEX_STMT: { case QUERY_NODE_CREATE_INDEX_STMT: {
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode; SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
nodesDestroyNode(pStmt->pOptions); nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyList(pStmt->pCols); nodesDestroyList(pStmt->pCols);
break; break;
} }
case QUERY_NODE_DROP_INDEX_STMT: // no pointer field case QUERY_NODE_DROP_INDEX_STMT: // no pointer field
case QUERY_NODE_CREATE_QNODE_STMT: // no pointer field case QUERY_NODE_CREATE_QNODE_STMT: // no pointer field
case QUERY_NODE_DROP_QNODE_STMT: // no pointer field case QUERY_NODE_DROP_QNODE_STMT: // no pointer field
case QUERY_NODE_CREATE_BNODE_STMT: // no pointer field
case QUERY_NODE_DROP_BNODE_STMT: // no pointer field
case QUERY_NODE_CREATE_SNODE_STMT: // no pointer field
case QUERY_NODE_DROP_SNODE_STMT: // no pointer field
case QUERY_NODE_CREATE_MNODE_STMT: // no pointer field
case QUERY_NODE_DROP_MNODE_STMT: // no pointer field
break; break;
case QUERY_NODE_CREATE_TOPIC_STMT: case QUERY_NODE_CREATE_TOPIC_STMT:
nodesDestroyNode(((SCreateTopicStmt*)pNode)->pQuery); nodesDestroyNode(((SCreateTopicStmt*)pNode)->pQuery);
break; break;
case QUERY_NODE_DROP_TOPIC_STMT: // no pointer field case QUERY_NODE_DROP_TOPIC_STMT: // no pointer field
case QUERY_NODE_DROP_CGROUP_STMT: // no pointer field
case QUERY_NODE_ALTER_LOCAL_STMT: // no pointer field case QUERY_NODE_ALTER_LOCAL_STMT: // no pointer field
break; break;
case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_EXPLAIN_STMT: {
case QUERY_NODE_SHOW_TABLES_STMT: SExplainStmt* pStmt = (SExplainStmt*)pNode;
case QUERY_NODE_SHOW_STABLES_STMT: nodesDestroyNode((SNode*)pStmt->pOptions);
case QUERY_NODE_SHOW_USERS_STMT: nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_DESCRIBE_STMT:
taosMemoryFree(((SDescribeStmt*)pNode)->pMeta);
break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT: // no pointer field
case QUERY_NODE_COMPACT_STMT: // no pointer field
case QUERY_NODE_CREATE_FUNCTION_STMT: // no pointer field
case QUERY_NODE_DROP_FUNCTION_STMT: // no pointer field
break;
case QUERY_NODE_CREATE_STREAM_STMT: {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode;
nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
break;
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
nodesDestroyList(((SRedistributeVgroupStmt*)pNode)->pDnodes);
break;
case QUERY_NODE_SPLIT_VGROUP_STMT: // no pointer field
case QUERY_NODE_SYNCDB_STMT: // no pointer field
case QUERY_NODE_GRANT_STMT: // no pointer field
case QUERY_NODE_REVOKE_STMT: // no pointer field
break;
case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_VGROUPS_STMT:
case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_FUNCTIONS_STMT: case QUERY_NODE_SHOW_FUNCTIONS_STMT:
case QUERY_NODE_SHOW_INDEXES_STMT: case QUERY_NODE_SHOW_INDEXES_STMT:
case QUERY_NODE_SHOW_STREAMS_STMT: { case QUERY_NODE_SHOW_STABLES_STMT:
case QUERY_NODE_SHOW_STREAMS_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
case QUERY_NODE_SHOW_USERS_STMT:
case QUERY_NODE_SHOW_LICENCE_STMT:
case QUERY_NODE_SHOW_VGROUPS_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: {
SShowStmt* pStmt = (SShowStmt*)pNode; SShowStmt* pStmt = (SShowStmt*)pNode;
nodesDestroyNode(pStmt->pDbName); nodesDestroyNode(pStmt->pDbName);
nodesDestroyNode(pStmt->pTbNamePattern); nodesDestroyNode(pStmt->pTbNamePattern);
break; break;
} }
case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field
case QUERY_NODE_KILL_QUERY_STMT: // no pointer field
case QUERY_NODE_KILL_TRANSACTION_STMT: // no pointer field
break;
case QUERY_NODE_DELETE_STMT: {
SDeleteStmt* pStmt = (SDeleteStmt*)pNode;
nodesDestroyNode(pStmt->pFromTable);
nodesDestroyNode(pStmt->pWhere);
nodesDestroyNode(pStmt->pCountFunc);
nodesDestroyNode(pStmt->pTagIndexCond);
break;
}
case QUERY_NODE_QUERY: { case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode; SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
...@@ -606,6 +685,13 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -606,6 +685,13 @@ void nodesDestroyNode(SNodeptr pNode) {
case QUERY_NODE_LOGIC_PLAN_EXCHANGE: case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
destroyLogicNode((SLogicNode*)pNode); destroyLogicNode((SLogicNode*)pNode);
break; break;
case QUERY_NODE_LOGIC_PLAN_MERGE: {
SMergeLogicNode* pLogicNode = (SMergeLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pMergeKeys);
nodesDestroyList(pLogicNode->pInputs);
break;
}
case QUERY_NODE_LOGIC_PLAN_WINDOW: { case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pLogicNode = (SWindowLogicNode*)pNode; SWindowLogicNode* pLogicNode = (SWindowLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
...@@ -613,6 +699,13 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -613,6 +699,13 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyNode(pLogicNode->pTspk); nodesDestroyNode(pLogicNode->pTspk);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_FILL: {
SFillLogicNode* pLogicNode = (SFillLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyNode(pLogicNode->pWStartTs);
nodesDestroyNode(pLogicNode->pValues);
break;
}
case QUERY_NODE_LOGIC_PLAN_SORT: { case QUERY_NODE_LOGIC_PLAN_SORT: {
SSortLogicNode* pLogicNode = (SSortLogicNode*)pNode; SSortLogicNode* pLogicNode = (SSortLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
...@@ -625,10 +718,16 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -625,10 +718,16 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(pLogicNode->pPartitionKeys); nodesDestroyList(pLogicNode->pPartitionKeys);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncLogicNode* pLogicNode = (SIndefRowsFuncLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pVectorFuncs);
break;
}
case QUERY_NODE_LOGIC_SUBPLAN: { case QUERY_NODE_LOGIC_SUBPLAN: {
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode; SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
nodesDestroyList(pSubplan->pChildren); nodesDestroyList(pSubplan->pChildren);
nodesDestroyNode(pSubplan->pNode); nodesDestroyNode((SNode*)pSubplan->pNode);
nodesClearList(pSubplan->pParents); nodesClearList(pSubplan->pParents);
taosMemoryFreeClear(pSubplan->pVgroupList); taosMemoryFreeClear(pSubplan->pVgroupList);
break; break;
...@@ -637,17 +736,9 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -637,17 +736,9 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(((SQueryLogicPlan*)pNode)->pTopSubplans); nodesDestroyList(((SQueryLogicPlan*)pNode)->pTopSubplans);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode); destroyScanPhysiNode((SScanPhysiNode*)pNode);
break; break;
...@@ -678,21 +769,62 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -678,21 +769,62 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(pPhyNode->pSrcEndPoints); nodesDestroyList(pPhyNode->pSrcEndPoints);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_MERGE: {
SMergePhysiNode* pPhyNode = (SMergePhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pMergeKeys);
nodesDestroyList(pPhyNode->pTargets);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: { case QUERY_NODE_PHYSICAL_PLAN_SORT: {
SSortPhysiNode* pPhyNode = (SSortPhysiNode*)pNode; SSortPhysiNode* pPhyNode = (SSortPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pExprs); nodesDestroyList(pPhyNode->pExprs);
nodesDestroyNode(pPhyNode->pSortKeys); nodesDestroyList(pPhyNode->pSortKeys);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode* pPhyNode = (SFillPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pWStartTs);
nodesDestroyNode(pPhyNode->pValues);
nodesDestroyList(pPhyNode->pTargets);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
SStateWinodwPhysiNode* pPhyNode = (SStateWinodwPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pStateKey);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode* pPhyNode = (SPartitionPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
nodesDestroyList(pPhyNode->pPartitionKeys);
nodesDestroyList(pPhyNode->pTargets);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
nodesDestroyList(pPhyNode->pVectorFuncs);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
destroyDataSinkNode((SDataSinkNode*)pNode); destroyDataSinkNode((SDataSinkNode*)pNode);
break; break;
...@@ -702,11 +834,17 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -702,11 +834,17 @@ void nodesDestroyNode(SNodeptr pNode) {
taosMemoryFreeClear(pSink->pData); taosMemoryFreeClear(pSink->pData);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyNode(pSink->pAffectedRows);
break;
}
case QUERY_NODE_PHYSICAL_SUBPLAN: { case QUERY_NODE_PHYSICAL_SUBPLAN: {
SSubplan* pSubplan = (SSubplan*)pNode; SSubplan* pSubplan = (SSubplan*)pNode;
nodesDestroyList(pSubplan->pChildren); nodesDestroyList(pSubplan->pChildren);
nodesDestroyNode(pSubplan->pNode); nodesDestroyNode((SNode*)pSubplan->pNode);
nodesDestroyNode(pSubplan->pDataSink); nodesDestroyNode((SNode*)pSubplan->pDataSink);
nodesClearList(pSubplan->pParents); nodesClearList(pSubplan->pParents);
break; break;
} }
...@@ -744,7 +882,7 @@ SNodeList* nodesMakeList() { ...@@ -744,7 +882,7 @@ SNodeList* nodesMakeList() {
return p; return p;
} }
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) { int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -766,7 +904,7 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) { ...@@ -766,7 +904,7 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) { int32_t nodesListStrictAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pNode) { if (NULL == pNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -778,7 +916,7 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) { ...@@ -778,7 +916,7 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
return code; return code;
} }
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) { int32_t nodesListMakeAppend(SNodeList** pList, SNode* pNode) {
if (NULL == *pList) { if (NULL == *pList) {
*pList = nodesMakeList(); *pList = nodesMakeList();
if (NULL == *pList) { if (NULL == *pList) {
...@@ -789,7 +927,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) { ...@@ -789,7 +927,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) {
return nodesListAppend(*pList, pNode); return nodesListAppend(*pList, pNode);
} }
int32_t nodesListMakeStrictAppend(SNodeList** pList, SNodeptr pNode) { int32_t nodesListMakeStrictAppend(SNodeList** pList, SNode* pNode) {
if (NULL == *pList) { if (NULL == *pList) {
*pList = nodesMakeList(); *pList = nodesMakeList();
if (NULL == *pList) { if (NULL == *pList) {
...@@ -831,7 +969,7 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) { ...@@ -831,7 +969,7 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) {
return code; return code;
} }
int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode) { int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -886,7 +1024,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) { ...@@ -886,7 +1024,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
taosMemoryFreeClear(pSrc); taosMemoryFreeClear(pSrc);
} }
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index) { SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
SNode* node; SNode* node;
FOREACH(node, pList) { FOREACH(node, pList) {
if (0 == index--) { if (0 == index--) {
...@@ -1420,7 +1558,7 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) { ...@@ -1420,7 +1558,7 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
*pDst = nodesListGetNode(*pSrc, 0); *pDst = nodesListGetNode(*pSrc, 0);
nodesClearList(*pSrc); nodesClearList(*pSrc);
} else { } else {
SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) { if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -27,12 +27,12 @@ extern "C" { ...@@ -27,12 +27,12 @@ extern "C" {
#define QUERY_SMA_OPTIMIZE_DISABLE 0 #define QUERY_SMA_OPTIMIZE_DISABLE 0
#define QUERY_SMA_OPTIMIZE_ENABLE 1 #define QUERY_SMA_OPTIMIZE_ENABLE 1
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery); int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery); int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
......
...@@ -87,6 +87,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con ...@@ -87,6 +87,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
bool* pPass); bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo); int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -461,6 +461,7 @@ into_opt(A) ::= INTO full_table_name(B). ...@@ -461,6 +461,7 @@ into_opt(A) ::= INTO full_table_name(B).
stream_options(A) ::= . { A = createStreamOptions(pCxt); } stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; } stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; } stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
/************************************************ kill connection/query ***********************************************/ /************************************************ kill connection/query ***********************************************/
......
...@@ -77,7 +77,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { ...@@ -77,7 +77,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
abort_parse: abort_parse:
ParseFree(pParser, (FFree)taosMemoryFree); ParseFree(pParser, (FFree)taosMemoryFree);
if (TSDB_CODE_SUCCESS == cxt.errCode) { if (TSDB_CODE_SUCCESS == cxt.errCode) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery)); *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -95,12 +95,6 @@ typedef struct SCollectMetaKeyCxt { ...@@ -95,12 +95,6 @@ typedef struct SCollectMetaKeyCxt {
SNode* pStmt; SNode* pStmt;
} SCollectMetaKeyCxt; } SCollectMetaKeyCxt;
static void destroyCollectMetaKeyCxt(SCollectMetaKeyCxt* pCxt) {
if (NULL != pCxt->pMetaCache) {
// TODO
}
}
typedef struct SCollectMetaKeyFromExprCxt { typedef struct SCollectMetaKeyFromExprCxt {
SCollectMetaKeyCxt* pComCxt; SCollectMetaKeyCxt* pComCxt;
int32_t errCode; int32_t errCode;
...@@ -463,16 +457,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -463,16 +457,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery) { int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
SCollectMetaKeyCxt cxt = { SCollectMetaKeyCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pMetaCache, .pStmt = pQuery->pRoot};
.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache)), .pStmt = pQuery->pRoot}; return collectMetaKeyFromQuery(&cxt, pQuery->pRoot);
if (NULL == cxt.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = collectMetaKeyFromQuery(&cxt, pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
TSWAP(pQuery->pMetaCache, cxt.pMetaCache);
}
destroyCollectMetaKeyCxt(&cxt);
return code;
} }
...@@ -105,7 +105,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { ...@@ -105,7 +105,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery) { int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
SAuthCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pQuery->pMetaCache, .errCode = TSDB_CODE_SUCCESS}; SAuthCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pMetaCache, .errCode = TSDB_CODE_SUCCESS};
return authQuery(&cxt, pQuery->pRoot); return authQuery(&cxt, pQuery->pRoot);
} }
...@@ -65,7 +65,7 @@ static bool isCondition(const SNode* pNode) { ...@@ -65,7 +65,7 @@ static bool isCondition(const SNode* pNode) {
} }
static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) { static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
SOperatorNode* pOp = nodesMakeNode(QUERY_NODE_OPERATOR); SOperatorNode* pOp = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
if (NULL == pOp) { if (NULL == pOp) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -1301,6 +1301,8 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { ...@@ -1301,6 +1301,8 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
destroyCreateSubTbReq(&pCxt->createTblReq); destroyCreateSubTbReq(&pCxt->createTblReq);
} }
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
static void destroyInsertParseContext(SInsertParseContext* pCxt) { static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt); destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj); taosHashCleanup(pCxt->pVgroupsHashObj);
...@@ -1458,7 +1460,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1458,7 +1460,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// [(field1_name, ...)] // [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...]; // [...];
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
SInsertParseContext context = { SInsertParseContext context = {
.pComCxt = pContext, .pComCxt = pContext,
.pSql = (char*)pContext->pSql, .pSql = (char*)pContext->pSql,
...@@ -1469,7 +1471,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1469,7 +1471,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), .pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.totalNum = 0, .totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb}; .pStmtCb = pContext->pStmtCb,
.pMetaCache = pMetaCache};
if (pContext->pStmtCb && *pQuery) { if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
...@@ -1484,18 +1487,17 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1484,18 +1487,17 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) { NULL == context.pTableNameHashObj || NULL == context.pDbFNameHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(context.pSubTableHashObj, destroySubTableHashElem);
if (pContext->pStmtCb) { if (pContext->pStmtCb) {
TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT); TSDB_QUERY_SET_TYPE(context.pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
} }
if (NULL == *pQuery) { if (NULL == *pQuery) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery)); *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} else {
context.pMetaCache = (*pQuery)->pMetaCache;
} }
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false; (*pQuery)->haveResultSet = false;
...@@ -1694,24 +1696,20 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { ...@@ -1694,24 +1696,20 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) { int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
SInsertParseSyntaxCxt context = {.pComCxt = pContext, SInsertParseSyntaxCxt context = {.pComCxt = pContext,
.pSql = (char*)pContext->pSql, .pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))}; .pMetaCache = pMetaCache};
if (NULL == context.pMetaCache) { int32_t code = skipInsertInto(&context.pSql, &context.msg);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBodySyntax(&context); code = parseInsertBodySyntax(&context);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery)); *pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
TSWAP((*pQuery)->pMetaCache, context.pMetaCache);
} }
return code; return code;
} }
......
...@@ -117,6 +117,7 @@ static SKeyword keywordTable[] = { ...@@ -117,6 +117,7 @@ static SKeyword keywordTable[] = {
{"LOCAL", TK_LOCAL}, {"LOCAL", TK_LOCAL},
{"MATCH", TK_MATCH}, {"MATCH", TK_MATCH},
{"MAXROWS", TK_MAXROWS}, {"MAXROWS", TK_MAXROWS},
{"MAX_DELAY", TK_MAX_DELAY},
{"MERGE", TK_MERGE}, {"MERGE", TK_MERGE},
{"MINROWS", TK_MINROWS}, {"MINROWS", TK_MINROWS},
{"MINUS", TK_MINUS}, {"MINUS", TK_MINUS},
......
此差异已折叠。
...@@ -34,8 +34,8 @@ bool qIsInsertSql(const char* pStr, size_t length) { ...@@ -34,8 +34,8 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1); } while (1);
} }
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) { static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
int32_t code = authenticate(pCxt, pQuery); int32_t code = authenticate(pCxt, pQuery, pMetaCache);
if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) { if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
TSWAP(pQuery->pPrepareRoot, pQuery->pRoot); TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
...@@ -43,7 +43,7 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) { ...@@ -43,7 +43,7 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translate(pCxt, pQuery); code = translate(pCxt, pQuery, pMetaCache);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, pQuery); code = calculateConstant(pCxt, pQuery);
...@@ -54,15 +54,15 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) { ...@@ -54,15 +54,15 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery); int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = analyseSemantic(pCxt, *pQuery); code = analyseSemantic(pCxt, *pQuery, NULL);
} }
return code; return code;
} }
static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery) { static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
int32_t code = parse(pCxt, pQuery); int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKey(pCxt, *pQuery); code = collectMetaKey(pCxt, *pQuery, pMetaCache);
} }
return code; return code;
} }
...@@ -149,7 +149,7 @@ static void rewriteExprAlias(SNode* pRoot) { ...@@ -149,7 +149,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery); code = parseInsertSql(pCxt, pQuery, NULL);
} else { } else {
code = parseSqlIntoAst(pCxt, pQuery); code = parseSqlIntoAst(pCxt, pQuery);
} }
...@@ -158,29 +158,38 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -158,29 +158,38 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
} }
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = TSDB_CODE_SUCCESS; SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery); code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else { } else {
code = parseSqlSyntax(pCxt, pQuery); code = parseSqlSyntax(pCxt, pQuery, &metaCache);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq); code = buildCatalogReq(&metaCache, pCatalogReq);
} }
destoryParseMetaCache(&metaCache);
terrno = code; terrno = code;
return code; return code;
} }
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) { const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache); SParseMetaCache metaCache = {0};
if (NULL == pQuery->pRoot) { int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
return parseInsertSql(pCxt, &pQuery); if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache);
} else {
code = analyseSemantic(pCxt, pQuery, &metaCache);
}
} }
return analyseSemantic(pCxt, pQuery); destoryParseMetaCache(&metaCache);
terrno = code;
return code;
} }
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); } void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode((SNode*)pQueryNode); }
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema); return extractResultSchema(pRoot, numOfCols, pSchema);
...@@ -226,10 +235,9 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx ...@@ -226,10 +235,9 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
} }
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery) { int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = translate(pCxt, pQuery); int32_t code = translate(pCxt, pQuery, NULL);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, pQuery); code = calculateConstant(pCxt, pQuery);
} }
return code; return code;
} }
此差异已折叠。
...@@ -45,6 +45,7 @@ class ParserEnv : public testing::Environment { ...@@ -45,6 +45,7 @@ class ParserEnv : public testing::Environment {
destroyMetaDataEnv(); destroyMetaDataEnv();
taosCleanupKeywordsTable(); taosCleanupKeywordsTable();
fmFuncMgtDestroy(); fmFuncMgtDestroy();
taosCloseLog();
} }
ParserEnv() {} ParserEnv() {}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册