提交 d1e0327d 编写于 作者: X Xiaoyu Wang

fix: handle the memory leak of parser

上级 fc4b959a
......@@ -336,22 +336,21 @@ typedef enum EQueryExecMode {
} EQueryExecMode;
typedef struct SQuery {
ENodeType type;
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pTableList;
SArray* pDbList;
bool showRewrite;
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pPrepareRoot;
struct SParseMetaCache* pMetaCache;
ENodeType type;
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pTableList;
SArray* pDbList;
bool showRewrite;
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pPrepareRoot;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
......@@ -30,7 +30,7 @@
#define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt);
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt);
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
......@@ -177,8 +177,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields;
}
static void syncQueryFn(void* param, void* res, int32_t code) {
SSyncQueryParam* pParam = param;
static void syncQueryFn(void *param, void *res, int32_t code) {
SSyncQueryParam *pParam = param;
pParam->pRequest = res;
pParam->pRequest->code = code;
......@@ -190,10 +190,10 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return NULL;
}
STscObj* pTscObj = (STscObj*)taos;
STscObj *pTscObj = (STscObj *)taos;
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param);
......@@ -606,16 +606,30 @@ const char *taos_get_server_info(TAOS *taos) {
}
typedef struct SqlParseWrapper {
SParseContext* pCtx;
SParseContext *pCtx;
SCatalogReq catalogReq;
SRequestObj* pRequest;
SQuery* pQuery;
SRequestObj *pRequest;
SQuery *pQuery;
} SqlParseWrapper;
void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper*) param;
SQuery* pQuery = pWrapper->pQuery;
SRequestObj* pRequest = pWrapper->pRequest;
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
taosArrayDestroy(pWrapper->catalogReq.pDbVgroup);
taosArrayDestroy(pWrapper->catalogReq.pDbCfg);
taosArrayDestroy(pWrapper->catalogReq.pDbInfo);
taosArrayDestroy(pWrapper->catalogReq.pTableMeta);
taosArrayDestroy(pWrapper->catalogReq.pTableHash);
taosArrayDestroy(pWrapper->catalogReq.pUdf);
taosArrayDestroy(pWrapper->catalogReq.pIndex);
taosArrayDestroy(pWrapper->catalogReq.pUser);
taosArrayDestroy(pWrapper->catalogReq.pTableIndex);
taosMemoryFree(pWrapper->pCtx);
taosMemoryFree(pWrapper);
}
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
......@@ -630,22 +644,22 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList);
taosMemoryFree(pWrapper);
destorySqlParseWrapper(pWrapper);
launchAsyncQuery(pRequest, pQuery);
} else {
destorySqlParseWrapper(pWrapper);
tscDebug("error happens, code:%d", code);
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code),
pRequest->retry, pRequest->requestId);
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
}
// return to app directly
taosMemoryFree(pWrapper);
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self,
tstrerror(code), pRequest->requestId);
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......@@ -670,7 +684,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
......@@ -678,11 +692,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
pRequest->body.queryFp = fp;
pRequest->body.param = param;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
const STscObj *pTscObj = pRequest->pTscObj;
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
......@@ -690,27 +704,29 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
**pCxt = (SParseContext){.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,};
**pCxt = (SParseContext){
.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = false,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,
};
return TSDB_CODE_SUCCESS;
}
void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
SParseContext* pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0;
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext *pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
code = pRequest->prevCode;
......@@ -747,23 +763,24 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
pWrapper->pRequest = pRequest;
pWrapper->catalogReq = catalogReq;
code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId, &catalogReq,
retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) {
return;
}
_error:
tscError("0x%"PRIx64" error happens, code:%d - %s, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId);
_error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
terrno = code;
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
static void fetchCallback(void* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param;
static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0;
......@@ -779,7 +796,8 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
return;
}
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false);
pRequest->code =
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
......@@ -795,7 +813,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
}
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT (res != NULL && fp != NULL);
ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
......@@ -819,7 +837,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
}
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res;
......@@ -827,9 +845,9 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
taos_fetch_rows_a(res, fp, param);
}
const void* taos_get_raw_block(TAOS_RES* res) {
const void *taos_get_raw_block(TAOS_RES *res) {
ASSERT(res != NULL);
SRequestObj* pRequest = res;
SRequestObj *pRequest = res;
return pRequest->body.resInfo.pData;
}
......@@ -913,26 +931,25 @@ int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
return stmtSetTbTags(stmt, tags);
}
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetTagFields(stmt, fieldNum, fields);
}
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetColFields(stmt, fieldNum, fields);
}
......@@ -1091,4 +1108,3 @@ int taos_stmt_close(TAOS_STMT *stmt) {
return stmtClose(stmt);
}
......@@ -215,6 +215,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SKillStmt));
case QUERY_NODE_DELETE_STMT:
return makeNode(type, sizeof(SDeleteStmt));
case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery));
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
......
......@@ -27,12 +27,12 @@ extern "C" {
#define QUERY_SMA_OPTIMIZE_DISABLE 0
#define QUERY_SMA_OPTIMIZE_ENABLE 1
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
......
......@@ -87,6 +87,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache);
#ifdef __cplusplus
}
......
......@@ -77,7 +77,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
abort_parse:
ParseFree(pParser, (FFree)taosMemoryFree);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
*pQuery = nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -95,12 +95,6 @@ typedef struct SCollectMetaKeyCxt {
SNode* pStmt;
} SCollectMetaKeyCxt;
static void destroyCollectMetaKeyCxt(SCollectMetaKeyCxt* pCxt) {
if (NULL != pCxt->pMetaCache) {
// TODO
}
}
typedef struct SCollectMetaKeyFromExprCxt {
SCollectMetaKeyCxt* pComCxt;
int32_t errCode;
......@@ -463,16 +457,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery) {
SCollectMetaKeyCxt cxt = {
.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache)), .pStmt = pQuery->pRoot};
if (NULL == cxt.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = collectMetaKeyFromQuery(&cxt, pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
TSWAP(pQuery->pMetaCache, cxt.pMetaCache);
}
destroyCollectMetaKeyCxt(&cxt);
return code;
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
SCollectMetaKeyCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pMetaCache, .pStmt = pQuery->pRoot};
return collectMetaKeyFromQuery(&cxt, pQuery->pRoot);
}
......@@ -101,7 +101,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery) {
SAuthCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pQuery->pMetaCache, .errCode = TSDB_CODE_SUCCESS};
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
SAuthCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pMetaCache, .errCode = TSDB_CODE_SUCCESS};
return authQuery(&cxt, pQuery->pRoot);
}
......@@ -1441,7 +1441,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
SInsertParseContext context = {
.pComCxt = pContext,
.pSql = (char*)pContext->pSql,
......@@ -1452,7 +1452,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb};
.pStmtCb = pContext->pStmtCb,
.pMetaCache = pMetaCache};
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
......@@ -1473,12 +1474,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
}
if (NULL == *pQuery) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
*pQuery = nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
context.pMetaCache = (*pQuery)->pMetaCache;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
......@@ -1677,24 +1676,20 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
return TSDB_CODE_SUCCESS;
}
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) {
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) {
SInsertParseSyntaxCxt context = {.pComCxt = pContext,
.pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
if (NULL == context.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = skipInsertInto(&context.pSql, &context.msg);
.pMetaCache = pMetaCache};
int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBodySyntax(&context);
}
if (TSDB_CODE_SUCCESS == code) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
*pQuery = nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP((*pQuery)->pMetaCache, context.pMetaCache);
}
return code;
}
......
......@@ -5045,11 +5045,18 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
return code;
}
static void destoryAlterTbReq(SVAlterTbReq* pReq) {
taosMemoryFree(pReq->tbName);
taosMemoryFree(pReq->colName);
taosMemoryFree(pReq->colNewName);
taosMemoryFree(pReq->tagName);
taosMemoryFree(pReq->newComment);
}
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
STableMeta* pTableMeta = NULL;
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -5089,7 +5096,7 @@ static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
}
destoryAlterTbReq(&req);
return code;
}
......@@ -5225,10 +5232,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
STranslateContext cxt = {0};
int32_t code = initTranslateContext(pParseCxt, pQuery->pMetaCache, &cxt);
int32_t code = initTranslateContext(pParseCxt, pMetaCache, &cxt);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteQuery(&cxt, pQuery);
}
......
......@@ -19,6 +19,8 @@
#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
const void* nullPointer = NULL;
static char* getSyntaxErrFormat(int32_t errCode) {
switch (errCode) {
case TSDB_CODE_PAR_SYNTAX_ERROR:
......@@ -646,7 +648,7 @@ static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHa
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return taosHashPut(*pTables, pTbFName, len, &pTables, POINTER_BYTES);
return taosHashPut(*pTables, pTbFName, len, &nullPointer, POINTER_BYTES);
}
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
......@@ -688,7 +690,7 @@ static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** p
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
return taosHashPut(*pDbs, fullName, len, &pDbs, POINTER_BYTES);
return taosHashPut(*pDbs, fullName, len, &nullPointer, POINTER_BYTES);
}
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
......@@ -803,7 +805,7 @@ int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return taosHashPut(pMetaCache->pUdf, pFunc, strlen(pFunc), &pMetaCache, POINTER_BYTES);
return taosHashPut(pMetaCache->pUdf, pFunc, strlen(pFunc), &nullPointer, POINTER_BYTES);
}
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
......@@ -854,3 +856,14 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
}
return code;
}
void destoryParseMetaCache(SParseMetaCache* pMetaCache) {
taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pTableVgroup);
taosHashCleanup(pMetaCache->pDbCfg);
taosHashCleanup(pMetaCache->pDbInfo);
taosHashCleanup(pMetaCache->pUserAuth);
taosHashCleanup(pMetaCache->pUdf);
taosHashCleanup(pMetaCache->pTableIndex);
}
......@@ -34,8 +34,8 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1);
}
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = authenticate(pCxt, pQuery);
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
int32_t code = authenticate(pCxt, pQuery, pMetaCache);
if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
......@@ -43,7 +43,7 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
}
if (TSDB_CODE_SUCCESS == code) {
code = translate(pCxt, pQuery);
code = translate(pCxt, pQuery, pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, pQuery);
......@@ -54,15 +54,15 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = analyseSemantic(pCxt, *pQuery);
code = analyseSemantic(pCxt, *pQuery, NULL);
}
return code;
}
static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery) {
static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKey(pCxt, *pQuery);
code = collectMetaKey(pCxt, *pQuery, pMetaCache);
}
return code;
}
......@@ -76,7 +76,7 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
pVal->node.resType.type = pParam->buffer_type;
pVal->node.resType.bytes = inputSize;
switch (pParam->buffer_type) {
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......@@ -149,7 +149,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery);
code = parseInsertSql(pCxt, pQuery, NULL);
} else {
code = parseSqlIntoAst(pCxt, pQuery);
}
......@@ -158,26 +158,34 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
}
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = TSDB_CODE_SUCCESS;
SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery);
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else {
code = parseSqlSyntax(pCxt, pQuery);
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq);
code = buildCatalogReq(&metaCache, pCatalogReq);
}
destoryParseMetaCache(&metaCache);
terrno = code;
return code;
}
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache);
if (NULL == pQuery->pRoot) {
return parseInsertSql(pCxt, &pQuery);
SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache);
}
code = analyseSemantic(pCxt, pQuery, &metaCache);
}
return analyseSemantic(pCxt, pQuery);
destoryParseMetaCache(&metaCache);
terrno = code;
return code;
}
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); }
......@@ -226,10 +234,9 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
}
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = translate(pCxt, pQuery);
int32_t code = translate(pCxt, pQuery, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, pQuery);
}
return code;
}
......@@ -584,3 +584,35 @@ int32_t MockCatalogService::catalogGetTableIndex(const SName* pTableName, SArray
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}
void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) {
taosArrayDestroy(pReq->pDbVgroup);
taosArrayDestroy(pReq->pDbCfg);
taosArrayDestroy(pReq->pDbInfo);
taosArrayDestroy(pReq->pTableMeta);
taosArrayDestroy(pReq->pTableHash);
taosArrayDestroy(pReq->pUdf);
taosArrayDestroy(pReq->pIndex);
taosArrayDestroy(pReq->pUser);
taosArrayDestroy(pReq->pTableIndex);
delete pReq;
}
void MockCatalogService::destoryMetaRes(void* p) {
SMetaRes* pRes = (SMetaRes*)p;
taosMemoryFree(pRes->pRes);
}
void MockCatalogService::destoryMetaData(SMetaData* pData) {
taosArrayDestroyEx(pData->pDbVgroup, destoryMetaRes);
taosArrayDestroyEx(pData->pDbCfg, destoryMetaRes);
taosArrayDestroyEx(pData->pDbInfo, destoryMetaRes);
taosArrayDestroyEx(pData->pTableMeta, destoryMetaRes);
taosArrayDestroyEx(pData->pTableHash, destoryMetaRes);
taosArrayDestroyEx(pData->pTableIndex, destoryMetaRes);
taosArrayDestroyEx(pData->pUdfList, destoryMetaRes);
taosArrayDestroyEx(pData->pIndex, destoryMetaRes);
taosArrayDestroyEx(pData->pUser, destoryMetaRes);
taosArrayDestroyEx(pData->pQnodeList, destoryMetaRes);
delete pData;
}
......@@ -50,6 +50,10 @@ struct MockTableMeta {
class MockCatalogServiceImpl;
class MockCatalogService {
public:
static void destoryCatalogReq(SCatalogReq* pReq);
static void destoryMetaRes(void* p);
static void destoryMetaData(SMetaData* pData);
MockCatalogService();
~MockCatalogService();
ITableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType,
......
......@@ -188,9 +188,16 @@ TEST_F(ParserInitialATest, alterTable) {
SVAlterTbReq expect = {0};
auto initAlterTbReq = [](SVAlterTbReq* pReq) {
free(pReq->tbName);
free(pReq->colName);
free(pReq->colNewName);
memset(pReq, 0, sizeof(SVAlterTbReq));
};
auto setAlterColFunc = [&](const char* pTbname, int8_t alterType, const char* pColName, int8_t dataType = 0,
int32_t dataBytes = 0, const char* pNewColName = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
initAlterTbReq(&expect);
expect.tbName = strdup(pTbname);
expect.action = alterType;
expect.colName = strdup(pColName);
......@@ -213,7 +220,7 @@ TEST_F(ParserInitialATest, alterTable) {
};
auto setAlterTagFunc = [&](const char* pTbname, const char* pTagName, uint8_t* pNewVal, uint32_t bytes) {
memset(&expect, 0, sizeof(SVAlterTbReq));
initAlterTbReq(&expect);
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
expect.tagName = strdup(pTagName);
......@@ -224,7 +231,7 @@ TEST_F(ParserInitialATest, alterTable) {
};
auto setAlterOptionsFunc = [&](const char* pTbname, int32_t ttl, char* pComment = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
initAlterTbReq(&expect);
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
if (-1 != ttl) {
......
......@@ -51,7 +51,7 @@ class InsertTest : public Test {
}
int32_t run() {
code_ = parseInsertSql(&cxt_, &res_);
code_ = parseInsertSql(&cxt_, &res_, nullptr);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
}
......@@ -60,29 +60,31 @@ class InsertTest : public Test {
int32_t runAsync() {
cxt_.async = true;
code_ = parseInsertSyntax(&cxt_, &res_);
SParseMetaCache metaCache = {0};
code_ = parseInsertSyntax(&cxt_, &res_, &metaCache);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SCatalogReq catalogReq = {0};
code_ = buildCatalogReq(res_->pMetaCache, &catalogReq);
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
code_ = buildCatalogReq(&metaCache, catalogReq.get());
if (code_ != TSDB_CODE_SUCCESS) {
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SMetaData metaData = {0};
g_mockCatalogService->catalogGetAllMeta(&catalogReq, &metaData);
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
code_ = putMetaDataToCache(&catalogReq, &metaData, res_->pMetaCache);
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), &metaCache);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
code_ = parseInsertSql(&cxt_, &res_);
code_ = parseInsertSql(&cxt_, &res_, &metaCache);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
......
......@@ -45,6 +45,7 @@ class ParserEnv : public testing::Environment {
destroyMetaDataEnv();
taosCleanupKeywordsTable();
fmFuncMgtDestroy();
taosCloseLog();
}
ParserEnv() {}
......
......@@ -85,10 +85,11 @@ class ParserTestBaseImpl {
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
doAuthenticate(&cxt, pQuery);
doAuthenticate(&cxt, pQuery, nullptr);
doTranslate(&cxt, pQuery);
doTranslate(&cxt, pQuery, nullptr);
doCalculateConstant(&cxt, pQuery);
......@@ -196,9 +197,8 @@ class ParserTestBaseImpl {
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
void doCollectMetaKey(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(collectMetaKey, pCxt, pQuery);
ASSERT_NE(pQuery->pMetaCache, nullptr);
void doCollectMetaKey(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache);
}
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
......@@ -213,10 +213,12 @@ class ParserTestBaseImpl {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
}
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery) { DO_WITH_THROW(authenticate, pCxt, pQuery); }
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(authenticate, pCxt, pQuery, pMetaCache);
}
void doTranslate(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(translate, pCxt, pQuery);
void doTranslate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(translate, pCxt, pQuery, pMetaCache);
checkQuery(pQuery, PARSER_STAGE_TRANSLATE);
res_.translatedAst_ = toString(pQuery->pRoot);
}
......@@ -245,23 +247,26 @@ class ParserTestBaseImpl {
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
doCollectMetaKey(&cxt, pQuery);
SParseMetaCache metaCache = {0};
doCollectMetaKey(&cxt, pQuery, &metaCache);
SCatalogReq catalogReq = {0};
doBuildCatalogReq(pQuery->pMetaCache, &catalogReq);
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(&metaCache, catalogReq.get());
string err;
thread t1([&]() {
try {
SMetaData metaData = {0};
doGetAllMeta(&catalogReq, &metaData);
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get());
doPutMetaDataToCache(&catalogReq, &metaData, pQuery->pMetaCache);
doPutMetaDataToCache(catalogReq.get(), metaData.get(), &metaCache);
doAuthenticate(&cxt, pQuery);
doAuthenticate(&cxt, pQuery, &metaCache);
doTranslate(&cxt, pQuery);
doTranslate(&cxt, pQuery, &metaCache);
doCalculateConstant(&cxt, pQuery);
} catch (const TerminateFlag& e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册