未验证 提交 d25a9028 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #13093 from taosdata/feature/3.0_async

feat: parser adapts asynchronous interface
...@@ -322,21 +322,22 @@ typedef enum EQueryExecMode { ...@@ -322,21 +322,22 @@ typedef enum EQueryExecMode {
} EQueryExecMode; } EQueryExecMode;
typedef struct SQuery { typedef struct SQuery {
ENodeType type; ENodeType type;
EQueryExecMode execMode; EQueryExecMode execMode;
bool haveResultSet; bool haveResultSet;
SNode* pRoot; SNode* pRoot;
int32_t numOfResCols; int32_t numOfResCols;
SSchema* pResSchema; SSchema* pResSchema;
int8_t precision; int8_t precision;
SCmdMsgInfo* pCmdMsg; SCmdMsgInfo* pCmdMsg;
int32_t msgType; int32_t msgType;
SArray* pDbList; SArray* pDbList;
SArray* pTableList; SArray* pTableList;
bool showRewrite; bool showRewrite;
int32_t placeholderNum; int32_t placeholderNum;
SArray* pPlaceholderValues; SArray* pPlaceholderValues;
SNode* pPrepareRoot; SNode* pPrepareRoot;
struct SParseMetaCache* pMetaCache;
} SQuery; } SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
...@@ -23,6 +23,9 @@ extern "C" { ...@@ -23,6 +23,9 @@ extern "C" {
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
struct SCatalogReq;
struct SMetaData;
typedef struct SStmtCallback { typedef struct SStmtCallback {
TAOS_STMT* pStmt; TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**); int32_t (*getTbNameFn)(TAOS_STMT*, char**);
...@@ -45,11 +48,17 @@ typedef struct SParseContext { ...@@ -45,11 +48,17 @@ typedef struct SParseContext {
SStmtCallback* pStmtCb; SStmtCallback* pStmtCb;
const char* pUser; const char* pUser;
bool isSuperUser; bool isSuperUser;
bool async;
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length); bool qIsInsertSql(const char* pStr, size_t length);
// for async mode
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery);
void qDestroyQuery(SQuery* pQueryNode); void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
......
...@@ -643,6 +643,7 @@ int32_t* taosGetErrno(); ...@@ -643,6 +643,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DROP_COL TAOS_DEF_ERROR_CODE(0, 0x2651) #define TSDB_CODE_PAR_INVALID_DROP_COL TAOS_DEF_ERROR_CODE(0, 0x2651)
#define TSDB_CODE_PAR_INVALID_COL_JSON TAOS_DEF_ERROR_CODE(0, 0x2652) #define TSDB_CODE_PAR_INVALID_COL_JSON TAOS_DEF_ERROR_CODE(0, 0x2652)
#define TSDB_CODE_PAR_VALUE_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x2653) #define TSDB_CODE_PAR_VALUE_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x2653)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2654)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -394,8 +394,8 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { ...@@ -394,8 +394,8 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) { if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue; continue;
} }
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver}; STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
...@@ -552,12 +552,12 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { ...@@ -552,12 +552,12 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) { int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
int32_t tbNum = taosArrayGetSize(tbList); int32_t tbNum = taosArrayGetSize(tbList);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
SName* pTbName = taosArrayGet(tbList, i); SName* pTbName = taosArrayGet(tbList, i);
catalogRemoveTableMeta(pCatalog, pTbName); catalogRemoveTableMeta(pCatalog, pTbName);
...@@ -566,7 +566,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) { ...@@ -566,7 +566,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t retryNum = 0; int32_t retryNum = 0;
...@@ -589,7 +588,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -589,7 +588,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList); removeMeta(pTscObj, pRequest->tableList);
} }
return pRequest; return pRequest;
} }
......
...@@ -27,13 +27,14 @@ extern "C" { ...@@ -27,13 +27,14 @@ extern "C" {
#include "querynodes.h" #include "querynodes.h"
typedef struct SAstCreateContext { typedef struct SAstCreateContext {
SParseContext* pQueryCxt; SParseContext* pQueryCxt;
SMsgBuf msgBuf; SMsgBuf msgBuf;
bool notSupport; bool notSupport;
SNode* pRootNode; SNode* pRootNode;
int16_t placeholderNo; int16_t placeholderNo;
SArray* pPlaceholderValues; SArray* pPlaceholderValues;
int32_t errCode; int32_t errCode;
SParseMetaCache* pMetaCache;
} SAstCreateContext; } SAstCreateContext;
typedef enum EDatabaseOptionType { typedef enum EDatabaseOptionType {
...@@ -74,7 +75,7 @@ typedef struct SAlterOption { ...@@ -74,7 +75,7 @@ typedef struct SAlterOption {
extern SToken nil_token; extern SToken nil_token;
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt); int32_t initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt);
SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode); SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode);
SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const SToken* pEnd, SNode* pNode); SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const SToken* pEnd, SNode* pNode);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "catalog.h"
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
...@@ -37,6 +38,13 @@ typedef struct SMsgBuf { ...@@ -37,6 +38,13 @@ typedef struct SMsgBuf {
char* buf; char* buf;
} SMsgBuf; } SMsgBuf;
typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg); int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr); int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
...@@ -47,10 +55,20 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta); ...@@ -47,10 +55,20 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId); int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDBVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableHashVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
int32_t getDBVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum);
int32_t getDBCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -38,7 +38,8 @@ ...@@ -38,7 +38,8 @@
SToken nil_token = {.type = TK_NK_NIL, .n = 0, .z = NULL}; SToken nil_token = {.type = TK_NK_NIL, .n = 0, .z = NULL};
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) { int32_t initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
memset(pCxt, 0, sizeof(SAstCreateContext));
pCxt->pQueryCxt = pParseCxt; pCxt->pQueryCxt = pParseCxt;
pCxt->msgBuf.buf = pParseCxt->pMsg; pCxt->msgBuf.buf = pParseCxt->pMsg;
pCxt->msgBuf.len = pParseCxt->msgLen; pCxt->msgBuf.len = pParseCxt->msgLen;
...@@ -47,6 +48,13 @@ void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) { ...@@ -47,6 +48,13 @@ void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
pCxt->placeholderNo = 0; pCxt->placeholderNo = 0;
pCxt->pPlaceholderValues = NULL; pCxt->pPlaceholderValues = NULL;
pCxt->errCode = TSDB_CODE_SUCCESS; pCxt->errCode = TSDB_CODE_SUCCESS;
if (pParseCxt->async) {
pCxt->pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache));
if (NULL == pCxt->pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
} }
static void copyStringFormStringToken(SToken* pToken, char* pBuf, int32_t len) { static void copyStringFormStringToken(SToken* pToken, char* pBuf, int32_t len) {
...@@ -464,6 +472,13 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTa ...@@ -464,6 +472,13 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTa
strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n); strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n);
} }
strncpy(realTable->table.tableName, pTableName->z, pTableName->n); strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
if (NULL != pCxt->pMetaCache) {
if (TSDB_CODE_SUCCESS != reserveTableMetaInCache(pCxt->pQueryCxt->acctId, realTable->table.dbName,
realTable->table.tableName, pCxt->pMetaCache)) {
nodesDestroyNode(realTable);
CHECK_OUT_OF_MEM(NULL);
}
}
return (SNode*)realTable; return (SNode*)realTable;
} }
......
...@@ -82,6 +82,7 @@ abort_parse: ...@@ -82,6 +82,7 @@ abort_parse:
(*pQuery)->pRoot = cxt.pRootNode; (*pQuery)->pRoot = cxt.pRootNode;
(*pQuery)->placeholderNum = cxt.placeholderNo; (*pQuery)->placeholderNum = cxt.placeholderNo;
TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues); TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues);
TSWAP((*pQuery)->pMetaCache, cxt.pMetaCache);
} }
taosArrayDestroy(cxt.pPlaceholderValues); taosArrayDestroy(cxt.pPlaceholderValues);
return cxt.errCode; return cxt.errCode;
......
...@@ -40,6 +40,7 @@ typedef struct STranslateContext { ...@@ -40,6 +40,7 @@ typedef struct STranslateContext {
SHashObj* pDbs; SHashObj* pDbs;
SHashObj* pTables; SHashObj* pTables;
SExplainOptions* pExplainOpt; SExplainOptions* pExplainOpt;
SParseMetaCache* pMetaCache;
} STranslateContext; } STranslateContext;
typedef struct SFullDatabaseName { typedef struct SFullDatabaseName {
...@@ -102,12 +103,17 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) { ...@@ -102,12 +103,17 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) { static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { if (pParCxt->async) {
code = collectUseTable(pName, pCxt->pTables); code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
} } else {
if (TSDB_CODE_SUCCESS == code) { code = collectUseDatabase(pName, pCxt->pDbs);
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta); if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
}
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname,
...@@ -126,8 +132,13 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, ...@@ -126,8 +132,13 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName,
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
SName name; SName name;
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name); toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
int32_t code = int32_t code = TSDB_CODE_SUCCESS;
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false); if (pParCxt->async) {
code = getTableMetaFromCache(pCxt->pMetaCache, &name, pMeta);
} else {
code =
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName, parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName,
pTableName); pTableName);
...@@ -135,29 +146,18 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, ...@@ -135,29 +146,18 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName,
return code; return code;
} }
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname,
pName->tname);
}
return code;
}
static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) { static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
char fullDbName[TSDB_DB_FNAME_LEN]; char fullDbName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pName, fullDbName); tNameGetFullDbName(pName, fullDbName);
int32_t code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { if (pParCxt->async) {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo); code = getDBVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
} else {
code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
}
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName); parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName);
...@@ -175,12 +175,17 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray* ...@@ -175,12 +175,17 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray*
static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) { static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { if (pParCxt->async) {
code = collectUseTable(pName, pCxt->pTables); code = getTableHashVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
} } else {
if (TSDB_CODE_SUCCESS == code) { code = collectUseDatabase(pName, pCxt->pDbs);
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo); if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
}
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname,
...@@ -198,9 +203,14 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName, ...@@ -198,9 +203,14 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName,
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum) { int32_t* pTableNum) {
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { if (pParCxt->async) {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum); code = getDBVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
} else {
code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
}
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName); parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName);
...@@ -214,9 +224,14 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo ...@@ -214,9 +224,14 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname); tNameGetFullDbName(&name, dbFname);
int32_t code = collectUseDatabaseImpl(dbFname, pCxt->pDbs); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) { if (pParCxt->async) {
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo); code = getDBCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
} else {
code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo);
}
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname); parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
...@@ -224,7 +239,7 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo ...@@ -224,7 +239,7 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
return code; return code;
} }
static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext* pCxt) { static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt; pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS; pCxt->errCode = TSDB_CODE_SUCCESS;
pCxt->msgBuf.buf = pParseCxt->pMsg; pCxt->msgBuf.buf = pParseCxt->pMsg;
...@@ -232,6 +247,7 @@ static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext* ...@@ -232,6 +247,7 @@ static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext*
pCxt->pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); pCxt->pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
pCxt->currLevel = 0; pCxt->currLevel = 0;
pCxt->currClause = 0; pCxt->currClause = 0;
pCxt->pMetaCache = pMetaCache;
pCxt->pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pCxt->pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pCxt->pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pCxt->pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pCxt->pNsLevel || NULL == pCxt->pDbs || NULL == pCxt->pTables) { if (NULL == pCxt->pNsLevel || NULL == pCxt->pDbs || NULL == pCxt->pTables) {
...@@ -1225,7 +1241,7 @@ static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTa ...@@ -1225,7 +1241,7 @@ static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTa
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
SArray* vgroupList = NULL; SArray* vgroupList = NULL;
code = getTableDistVgInfo(pCxt, pName, &vgroupList); code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
} }
...@@ -2797,7 +2813,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, ...@@ -2797,7 +2813,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
for (int32_t i = 1; i < num; ++i) { for (int32_t i = 1; i < num; ++i) {
SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i); SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i);
STranslateContext cxt = {0}; STranslateContext cxt = {0};
initTranslateContext(pCxt->pParseCxt, &cxt); initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt);
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2, code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2,
1 == i ? &pReq->ast1Len : &pReq->ast2Len); 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
destroyTranslateContext(&cxt); destroyTranslateContext(&cxt);
...@@ -4893,7 +4909,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4893,7 +4909,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) {
STranslateContext cxt = {0}; STranslateContext cxt = {0};
int32_t code = initTranslateContext(pParseCxt, &cxt); int32_t code = initTranslateContext(pParseCxt, pQuery->pMetaCache, &cxt);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = fmFuncMgtInit(); code = fmFuncMgtInit();
} }
......
...@@ -328,11 +328,11 @@ static bool isValidateTag(char* input) { ...@@ -328,11 +328,11 @@ static bool isValidateTag(char* input) {
return true; return true;
} }
int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* pMsgBuf, int16_t startColId) { int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* pMsgBuf, int16_t startColId) {
// set json NULL data // set json NULL data
uint8_t jsonNULL = TSDB_DATA_TYPE_NULL; uint8_t jsonNULL = TSDB_DATA_TYPE_NULL;
int jsonIndex = startColId + 1; int32_t jsonIndex = startColId + 1;
if (!json || strtrim((char*)json) == 0 ||strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) { if (!json || strtrim((char*)json) == 0 || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) {
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES); tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -343,15 +343,15 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -343,15 +343,15 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
return buildSyntaxErrMsg(pMsgBuf, "json parse error", json); return buildSyntaxErrMsg(pMsgBuf, "json parse error", json);
} }
int size = cJSON_GetArraySize(root); int32_t size = cJSON_GetArraySize(root);
if (!cJSON_IsObject(root)) { if (!cJSON_IsObject(root)) {
return buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json); return buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json);
} }
int retCode = 0; int32_t retCode = 0;
char* tagKV = NULL; char* tagKV = NULL;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for (int i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i); cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) { if (!item) {
qError("json inner error:%d", i); qError("json inner error:%d", i);
...@@ -365,9 +365,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -365,9 +365,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
goto end; goto end;
} }
size_t keyLen = strlen(jsonKey); size_t keyLen = strlen(jsonKey);
if(keyLen > TSDB_MAX_JSON_KEY_LEN){ if (keyLen > TSDB_MAX_JSON_KEY_LEN) {
qError("json key too long error"); qError("json key too long error");
retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey); retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey);
goto end; goto end;
} }
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) { if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
...@@ -447,4 +447,186 @@ end: ...@@ -447,4 +447,186 @@ end:
taosHashCleanup(keyHash); taosHashCleanup(keyHash);
cJSON_Delete(root); cJSON_Delete(root);
return retCode; return retCode;
} }
\ No newline at end of file
static int32_t buildTableReq(SHashObj* pTablesHash, SArray** pTables) {
if (NULL != pTablesHash) {
*pTables = taosArrayInit(taosHashGetSize(pTablesHash), sizeof(SName));
if (NULL == *pTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* p = taosHashIterate(pTablesHash, NULL);
while (NULL != p) {
size_t len = 0;
char* pKey = taosHashGetKey(p, &len);
char fullName[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(fullName, pKey, len);
SName name = {0};
tNameFromString(&name, fullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
taosArrayPush(*pTables, &name);
p = taosHashIterate(pTablesHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
if (NULL != pDbsHash) {
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), TSDB_DB_FNAME_LEN);
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* p = taosHashIterate(pDbsHash, NULL);
while (NULL != p) {
size_t len = 0;
char* pKey = taosHashGetKey(p, &len);
char fullName[TSDB_DB_FNAME_LEN] = {0};
strncpy(fullName, pKey, len);
taosArrayPush(*pDbs, fullName);
p = taosHashIterate(pDbsHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildTableMetaReq(SHashObj* pTableMetaHash, SArray** pTableMeta) {
return buildTableReq(pTableMetaHash, pTableMeta);
}
static int32_t buildDbVgroupReq(SHashObj* pDbVgroupHash, SArray** pDbVgroup) {
return buildDbReq(pDbVgroupHash, pDbVgroup);
}
static int32_t buildTableVgroupReq(SHashObj* pTableVgroupHash, SArray** pTableVgroup) {
return buildTableReq(pTableVgroupHash, pTableVgroup);
}
static int32_t buildDbCfgReq(SHashObj* pDbCfgHash, SArray** pDbCfg) { return buildDbReq(pDbCfgHash, pDbCfg); }
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableMetaReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = buildDbVgroupReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTableVgroupReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildDbCfgReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbCfg);
}
return code;
}
static int32_t putTableMetaToCache(const SArray* pTableMetaReq, const SArray* pTableMetaData, SHashObj* pTableMeta) {
int32_t ntables = taosArrayGetSize(pTableMetaReq);
for (int32_t i = 0; i < ntables; ++i) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(taosArrayGet(pTableMetaReq, i), fullName);
if (TSDB_CODE_SUCCESS !=
taosHashPut(pTableMeta, fullName, strlen(fullName), taosArrayGet(pTableMetaData, i), POINTER_BYTES)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putDbVgroupToCache(const SArray* pDbVgroupReq, const SArray* pDbVgroupData, SHashObj* pDbVgroup) {
int32_t nvgs = taosArrayGetSize(pDbVgroupReq);
for (int32_t i = 0; i < nvgs; ++i) {
char* pDbFName = taosArrayGet(pDbVgroupReq, i);
if (TSDB_CODE_SUCCESS !=
taosHashPut(pDbVgroup, pDbFName, strlen(pDbFName), taosArrayGet(pDbVgroupData, i), POINTER_BYTES)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putTableVgroupToCache(const SArray* pTableVgroupReq, const SArray* pTableVgroupData,
SHashObj* pTableVgroup) {
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
for (int32_t i = 0; i < ntables; ++i) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(taosArrayGet(pTableVgroupReq, i), fullName);
SVgroupInfo* pInfo = taosArrayGet(pTableVgroupData, i);
if (TSDB_CODE_SUCCESS != taosHashPut(pTableVgroup, fullName, strlen(fullName), &pInfo, POINTER_BYTES)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putDbCfgToCache(const SArray* pDbCfgReq, const SArray* pDbCfgData, SHashObj* pDbCfg) {
int32_t nvgs = taosArrayGetSize(pDbCfgReq);
for (int32_t i = 0; i < nvgs; ++i) {
char* pDbFName = taosArrayGet(pDbCfgReq, i);
SDbCfgInfo* pInfo = taosArrayGet(pDbCfgData, i);
if (TSDB_CODE_SUCCESS != taosHashPut(pDbCfg, pDbFName, strlen(pDbFName), &pInfo, POINTER_BYTES)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t code = putTableMetaToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = putDbVgroupToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, pMetaCache->pDbVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = putTableVgroupToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, pMetaCache->pTableVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = putDbCfgToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, pMetaCache->pDbCfg);
}
return code;
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pTableMeta) {
pMetaCache->pTableMeta = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pTableMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
return taosHashPut(pMetaCache->pTableMeta, fullName, len, &len, POINTER_BYTES);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
*pMeta = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
return NULL == *pMeta ? TSDB_CODE_PAR_INTERNAL_ERROR : TSDB_CODE_SUCCESS;
}
int32_t getDBVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) {
*pVgInfo = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName));
return NULL == *pVgInfo ? TSDB_CODE_PAR_INTERNAL_ERROR : TSDB_CODE_SUCCESS;
}
int32_t getTableHashVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
SVgroupInfo* pInfo = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName));
if (NULL == pInfo) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pVgroup, pInfo, sizeof(SVgroupInfo));
return TSDB_CODE_SUCCESS;
}
int32_t getDBVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
int32_t getDBCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) {
SDbCfgInfo* pDbCfg = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pDbCfg) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, pDbCfg, sizeof(SDbCfgInfo));
return TSDB_CODE_SUCCESS;
}
...@@ -34,22 +34,27 @@ bool qIsInsertSql(const char* pStr, size_t length) { ...@@ -34,22 +34,27 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1); } while (1);
} }
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = parse(pCxt, pQuery); int32_t code = authenticate(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = authenticate(pCxt, *pQuery);
}
if (TSDB_CODE_SUCCESS == code && (*pQuery)->placeholderNum > 0) { if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
TSWAP((*pQuery)->pPrepareRoot, (*pQuery)->pRoot); TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translate(pCxt, *pQuery); code = translate(pCxt, pQuery);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, *pQuery); code = calculateConstant(pCxt, pQuery);
}
return code;
}
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = semanticAnalysis(pCxt, *pQuery);
} }
return code; return code;
} }
...@@ -178,6 +183,29 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -178,6 +183,29 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
return code; return code;
} }
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
// todo insert sql
} else {
code = parse(pCxt, pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq);
}
terrno = code;
return code;
}
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache);
if (NULL == pQuery->pRoot) {
// todo insert sql
}
return semanticAnalysis(pCxt, pQuery);
}
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); } void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); }
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
......
...@@ -163,17 +163,17 @@ int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandl ...@@ -163,17 +163,17 @@ int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandl
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName, int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta) { STableMeta** pTableMeta) {
return mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
} }
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps, int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void* pRpc, const SEpSet* pMgmtEps,
const SName* pTableName, SVgroupInfo* vgInfo) { const SName* pTableName, SVgroupInfo* vgInfo) {
return mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo); return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo);
} }
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName, int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const SName* pTableName,
SArray** pVgList) { SArray** pVgList) {
return mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList);
} }
int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId,
...@@ -197,7 +197,7 @@ int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, con ...@@ -197,7 +197,7 @@ int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, con
} }
void initMetaDataEnv() { void initMetaDataEnv() {
mockCatalogService.reset(new MockCatalogService()); g_mockCatalogService.reset(new MockCatalogService());
static Stub stub; static Stub stub;
stub.set(catalogGetHandle, __catalogGetHandle); stub.set(catalogGetHandle, __catalogGetHandle);
...@@ -252,11 +252,11 @@ void initMetaDataEnv() { ...@@ -252,11 +252,11 @@ void initMetaDataEnv() {
} }
void generateMetaData() { void generateMetaData() {
generateInformationSchema(mockCatalogService.get()); generateInformationSchema(g_mockCatalogService.get());
generatePerformanceSchema(mockCatalogService.get()); generatePerformanceSchema(g_mockCatalogService.get());
generateTestT1(mockCatalogService.get()); generateTestT1(g_mockCatalogService.get());
generateTestST1(mockCatalogService.get()); generateTestST1(g_mockCatalogService.get());
mockCatalogService->showTables(); g_mockCatalogService->showTables();
} }
void destroyMetaDataEnv() { mockCatalogService.reset(); } void destroyMetaDataEnv() { g_mockCatalogService.reset(); }
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "tname.h" #include "tname.h"
#include "ttypes.h" #include "ttypes.h"
std::unique_ptr<MockCatalogService> mockCatalogService; std::unique_ptr<MockCatalogService> g_mockCatalogService;
class TableBuilder : public ITableBuilder { class TableBuilder : public ITableBuilder {
public: public:
...@@ -120,6 +120,14 @@ class MockCatalogServiceImpl { ...@@ -120,6 +120,14 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList); return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
} }
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = getAllTableVgroup(pCatalogReq->pTableHash, &pMetaData->pTableHash);
}
return code;
}
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType,
int32_t numOfColumns, int32_t numOfTags) { int32_t numOfColumns, int32_t numOfTags) {
builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags); builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags);
...@@ -300,6 +308,42 @@ class MockCatalogServiceImpl { ...@@ -300,6 +308,42 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pTableMetaReq) {
int32_t ntables = taosArrayGetSize(pTableMetaReq);
*pTableMetaData = taosArrayInit(ntables, POINTER_BYTES);
for (int32_t i = 0; i < ntables; ++i) {
STableMeta* pMeta = NULL;
code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), &pMeta);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(*pTableMetaData, &pMeta);
} else {
break;
}
}
}
return code;
}
int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pTableVgroupReq) {
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
*pTableVgroupData = taosArrayInit(ntables, POINTER_BYTES);
for (int32_t i = 0; i < ntables; ++i) {
SVgroupInfo* pVgInfo = (SVgroupInfo*)taosMemoryCalloc(1, sizeof(SVgroupInfo));
code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), pVgInfo);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(*pTableVgroupData, &pVgInfo);
} else {
break;
}
}
}
return code;
}
uint64_t id_; uint64_t id_;
std::unique_ptr<TableBuilder> builder_; std::unique_ptr<TableBuilder> builder_;
DbMetaCache meta_; DbMetaCache meta_;
...@@ -337,3 +381,7 @@ int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, S ...@@ -337,3 +381,7 @@ int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, S
int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const { int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const {
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList); return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
} }
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}
...@@ -61,11 +61,12 @@ class MockCatalogService { ...@@ -61,11 +61,12 @@ class MockCatalogService {
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const; int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const; int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const; int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
private: private:
std::unique_ptr<MockCatalogServiceImpl> impl_; std::unique_ptr<MockCatalogServiceImpl> impl_;
}; };
extern std::unique_ptr<MockCatalogService> mockCatalogService; extern std::unique_ptr<MockCatalogService> g_mockCatalogService;
#endif // MOCK_CATALOG_SERVICE_H #endif // MOCK_CATALOG_SERVICE_H
...@@ -52,11 +52,15 @@ class ParserEnv : public testing::Environment { ...@@ -52,11 +52,15 @@ class ParserEnv : public testing::Environment {
static void parseArg(int argc, char* argv[]) { static void parseArg(int argc, char* argv[]) {
int opt = 0; int opt = 0;
const char* optstring = ""; const char* optstring = "";
static struct option long_options[] = {{"dump", no_argument, NULL, 'd'}, {0, 0, 0, 0}}; static struct option long_options[] = {
{"dump", no_argument, NULL, 'd'}, {"async", no_argument, NULL, 'a'}, {0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) { while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
switch (opt) { switch (opt) {
case 'd': case 'd':
g_isDump = true; g_dump = true;
break;
case 'a':
g_testAsyncApis = true;
break; break;
default: default:
break; break;
......
...@@ -17,7 +17,10 @@ ...@@ -17,7 +17,10 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <thread>
#include "catalog.h"
#include "mockCatalogService.h"
#include "parInt.h" #include "parInt.h"
using namespace std; using namespace std;
...@@ -41,7 +44,8 @@ namespace ParserTest { ...@@ -41,7 +44,8 @@ namespace ParserTest {
} \ } \
} while (0); } while (0);
bool g_isDump = false; bool g_dump = false;
bool g_testAsyncApis = false;
struct TerminateFlag : public exception { struct TerminateFlag : public exception {
const char* what() const throw() { return "success and terminate"; } const char* what() const throw() { return "success and terminate"; }
...@@ -69,7 +73,60 @@ class ParserTestBaseImpl { ...@@ -69,7 +73,60 @@ class ParserTestBaseImpl {
doCalculateConstant(&cxt, pQuery); doCalculateConstant(&cxt, pQuery);
if (g_isDump) { if (g_dump) {
dump();
}
} catch (const TerminateFlag& e) {
// success and terminate
return;
} catch (...) {
dump();
throw;
}
if (g_testAsyncApis) {
runAsync(sql, expect, checkStage);
}
}
void runAsync(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage);
try {
SParseContext cxt = {0};
setParseContext(sql, &cxt, true);
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
SCatalogReq catalogReq = {0};
doBuildCatalogReq(pQuery->pMetaCache, &catalogReq);
string err;
thread t1([&]() {
try {
SMetaData metaData = {0};
doGetAllMeta(&catalogReq, &metaData);
doPutMetaDataToCache(&catalogReq, &metaData, pQuery->pMetaCache);
doTranslate(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
}
});
t1.join();
if (!err.empty()) {
throw runtime_error(err);
}
if (g_dump) {
dump(); dump();
} }
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
...@@ -144,7 +201,7 @@ class ParserTestBaseImpl { ...@@ -144,7 +201,7 @@ class ParserTestBaseImpl {
cout << res_.calcConstAst_ << endl; cout << res_.calcConstAst_ << endl;
} }
void setParseContext(const string& sql, SParseContext* pCxt) { void setParseContext(const string& sql, SParseContext* pCxt, bool async = false) {
stmtEnv_.sql_ = sql; stmtEnv_.sql_ = sql;
transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower); transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower);
...@@ -154,6 +211,7 @@ class ParserTestBaseImpl { ...@@ -154,6 +211,7 @@ class ParserTestBaseImpl {
pCxt->sqlLen = stmtEnv_.sql_.length(); pCxt->sqlLen = stmtEnv_.sql_.length();
pCxt->pMsg = stmtEnv_.msgBuf_.data(); pCxt->pMsg = stmtEnv_.msgBuf_.data();
pCxt->msgLen = stmtEnv_.msgBuf_.max_size(); pCxt->msgLen = stmtEnv_.msgBuf_.max_size();
pCxt->async = async;
} }
void doParse(SParseContext* pCxt, SQuery** pQuery) { void doParse(SParseContext* pCxt, SQuery** pQuery) {
...@@ -162,6 +220,18 @@ class ParserTestBaseImpl { ...@@ -162,6 +220,18 @@ class ParserTestBaseImpl {
res_.parsedAst_ = toString((*pQuery)->pRoot); res_.parsedAst_ = toString((*pQuery)->pRoot);
} }
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
}
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData);
}
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
}
void doTranslate(SParseContext* pCxt, SQuery* pQuery) { void doTranslate(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(translate, pCxt, pQuery); DO_WITH_THROW(translate, pCxt, pQuery);
checkQuery(pQuery, PARSER_STAGE_TRANSLATE); checkQuery(pQuery, PARSER_STAGE_TRANSLATE);
...@@ -200,6 +270,10 @@ void ParserTestBase::run(const std::string& sql, int32_t expect, ParserStage che ...@@ -200,6 +270,10 @@ void ParserTestBase::run(const std::string& sql, int32_t expect, ParserStage che
return impl_->run(sql, expect, checkStage); return impl_->run(sql, expect, checkStage);
} }
void ParserTestBase::runAsync(const std::string& sql, int32_t expect, ParserStage checkStage) {
return impl_->runAsync(sql, expect, checkStage);
}
void ParserTestBase::checkDdl(const SQuery* pQuery, ParserStage stage) { return; } void ParserTestBase::checkDdl(const SQuery* pQuery, ParserStage stage) { return; }
} // namespace ParserTest } // namespace ParserTest
...@@ -36,6 +36,7 @@ class ParserTestBase : public testing::Test { ...@@ -36,6 +36,7 @@ class ParserTestBase : public testing::Test {
void useDb(const std::string& acctId, const std::string& db); void useDb(const std::string& acctId, const std::string& db);
void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL); void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
void runAsync(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
virtual void checkDdl(const SQuery* pQuery, ParserStage stage); virtual void checkDdl(const SQuery* pQuery, ParserStage stage);
...@@ -63,7 +64,8 @@ class ParserDdlTest : public ParserTestBase { ...@@ -63,7 +64,8 @@ class ParserDdlTest : public ParserTestBase {
std::function<void(const SQuery*, ParserStage)> checkDdl_; std::function<void(const SQuery*, ParserStage)> checkDdl_;
}; };
extern bool g_isDump; extern bool g_dump;
extern bool g_testAsyncApis;
} // namespace ParserTest } // namespace ParserTest
......
...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(scalarTest ${SOURCE_LIST}) ADD_EXECUTABLE(scalarTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
scalarTest scalarTest
PUBLIC os util common gtest qcom function nodes scalar parser PUBLIC os util common gtest qcom function nodes scalar parser catalog transport
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
...@@ -447,9 +447,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order") ...@@ -447,9 +447,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order")
// parser // parser
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TABLE_NOT_EXIST, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TABLE_NOT_EXIST, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner //planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "planner internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")
//udf //udf
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册