From 06baa8614579288f3ab58fc8e0ee0361d3b3f689 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 30 May 2022 17:01:44 +0800 Subject: [PATCH] feat: parser of insert statement adapts asynchronous interface --- include/libs/parser/parser.h | 6 +- source/libs/parser/inc/parInt.h | 1 + source/libs/parser/inc/parUtil.h | 5 +- source/libs/parser/src/parAstParser.c | 65 ------ source/libs/parser/src/parInsert.c | 252 +++++++++++++++++++--- source/libs/parser/src/parUtil.c | 40 +++- source/libs/parser/src/parser.c | 20 +- source/libs/parser/test/parInsertTest.cpp | 83 ++++++- 8 files changed, 350 insertions(+), 122 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 1466bb7400..06272b8151 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -55,9 +55,9 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); bool qIsInsertSql(const char* pStr, size_t length); // for async mode -int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); -int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, - const struct SMetaData* pMetaData, SQuery* pQuery); +int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); +int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, + const struct SMetaData* pMetaData, SQuery* pQuery); void qDestroyQuery(SQuery* pQueryNode); diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 184ed7d8b2..3efe6700d2 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -24,6 +24,7 @@ extern "C" { #include "parUtil.h" #include "parser.h" +int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery); diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 80288dbc44..2468f7d75b 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -65,12 +65,15 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); +int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); +int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, SParseMetaCache* pMetaCache); +int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache); int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache); int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo); @@ -78,7 +81,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum); int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo); -int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type, +int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type, bool* pPass); int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 5d65a0b80b..68c9684c97 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -333,68 +333,22 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt); case QUERY_NODE_SELECT_STMT: return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt); - case QUERY_NODE_VNODE_MODIF_STMT: - case QUERY_NODE_CREATE_DATABASE_STMT: - case QUERY_NODE_DROP_DATABASE_STMT: - case QUERY_NODE_ALTER_DATABASE_STMT: - break; case QUERY_NODE_CREATE_TABLE_STMT: return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); - case QUERY_NODE_CREATE_SUBTABLE_CLAUSE: - break; case QUERY_NODE_CREATE_MULTI_TABLE_STMT: return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); - case QUERY_NODE_DROP_TABLE_CLAUSE: - case QUERY_NODE_DROP_TABLE_STMT: - case QUERY_NODE_DROP_SUPER_TABLE_STMT: - break; case QUERY_NODE_ALTER_TABLE_STMT: return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); - case QUERY_NODE_CREATE_USER_STMT: - case QUERY_NODE_ALTER_USER_STMT: - case QUERY_NODE_DROP_USER_STMT: - break; case QUERY_NODE_USE_DATABASE_STMT: return collectMetaKeyFromUseDatabase(pCxt, (SUseDatabaseStmt*)pStmt); - case QUERY_NODE_CREATE_DNODE_STMT: - case QUERY_NODE_DROP_DNODE_STMT: - case QUERY_NODE_ALTER_DNODE_STMT: - break; case QUERY_NODE_CREATE_INDEX_STMT: return collectMetaKeyFromCreateIndex(pCxt, (SCreateIndexStmt*)pStmt); - case QUERY_NODE_DROP_INDEX_STMT: - case QUERY_NODE_CREATE_QNODE_STMT: - case QUERY_NODE_DROP_QNODE_STMT: - case QUERY_NODE_CREATE_BNODE_STMT: - case QUERY_NODE_DROP_BNODE_STMT: - case QUERY_NODE_CREATE_SNODE_STMT: - case QUERY_NODE_DROP_SNODE_STMT: - case QUERY_NODE_CREATE_MNODE_STMT: - case QUERY_NODE_DROP_MNODE_STMT: - break; case QUERY_NODE_CREATE_TOPIC_STMT: return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt); - case QUERY_NODE_DROP_TOPIC_STMT: - case QUERY_NODE_DROP_CGROUP_STMT: - case QUERY_NODE_ALTER_LOCAL_STMT: - break; case QUERY_NODE_EXPLAIN_STMT: return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt); - case QUERY_NODE_DESCRIBE_STMT: - case QUERY_NODE_RESET_QUERY_CACHE_STMT: - case QUERY_NODE_COMPACT_STMT: - case QUERY_NODE_CREATE_FUNCTION_STMT: - case QUERY_NODE_DROP_FUNCTION_STMT: - break; case QUERY_NODE_CREATE_STREAM_STMT: return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt); - case QUERY_NODE_DROP_STREAM_STMT: - case QUERY_NODE_MERGE_VGROUP_STMT: - case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: - case QUERY_NODE_SPLIT_VGROUP_STMT: - case QUERY_NODE_SYNCDB_STMT: - case QUERY_NODE_GRANT_STMT: - case QUERY_NODE_REVOKE_STMT: case QUERY_NODE_SHOW_DNODES_STMT: return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_MNODES_STMT: @@ -407,8 +361,6 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_BNODES_STMT: return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt); - case QUERY_NODE_SHOW_CLUSTER_STMT: - break; case QUERY_NODE_SHOW_DATABASES_STMT: return collectMetaKeyFromShowDatabases(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_FUNCTIONS_STMT: @@ -429,25 +381,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_TOPICS_STMT: return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt); - case QUERY_NODE_SHOW_CONSUMERS_STMT: - case QUERY_NODE_SHOW_SUBSCRIBES_STMT: - case QUERY_NODE_SHOW_SMAS_STMT: - case QUERY_NODE_SHOW_CONFIGS_STMT: - case QUERY_NODE_SHOW_CONNECTIONS_STMT: - case QUERY_NODE_SHOW_QUERIES_STMT: - case QUERY_NODE_SHOW_VNODES_STMT: - case QUERY_NODE_SHOW_APPS_STMT: - case QUERY_NODE_SHOW_SCORES_STMT: - case QUERY_NODE_SHOW_VARIABLE_STMT: - case QUERY_NODE_SHOW_CREATE_DATABASE_STMT: - case QUERY_NODE_SHOW_CREATE_TABLE_STMT: - case QUERY_NODE_SHOW_CREATE_STABLE_STMT: - break; case QUERY_NODE_SHOW_TRANSACTIONS_STMT: return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); - case QUERY_NODE_KILL_CONNECTION_STMT: - case QUERY_NODE_KILL_QUERY_STMT: - case QUERY_NODE_KILL_TRANSACTION_STMT: default: break; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 58a6d1483f..047c2d1504 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -64,6 +64,7 @@ typedef struct SInsertParseContext { int32_t totalNum; SVnodeModifOpStmt* pOutput; SStmtCallback* pStmtCb; + SParseMetaCache* pMetaCache; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -92,15 +93,15 @@ typedef struct SMemParam { } \ } while (0) -static int32_t skipInsertInto(SInsertParseContext* pCxt) { +static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) { SToken sToken; - NEXT_TOKEN(pCxt->pSql, sToken); + NEXT_TOKEN(*pSql, sToken); if (TK_INSERT != sToken.type) { - return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z); + return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z); } - NEXT_TOKEN(pCxt->pSql, sToken); + NEXT_TOKEN(*pSql, sToken); if (TK_INTO != sToken.type) { - return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z); + return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z); } return TSDB_CODE_SUCCESS; } @@ -212,7 +213,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con return buildInvalidOperationMsg(pMsgBuf, msg4); } - char tbname[TSDB_TABLE_FNAME_LEN] = {0}; + char tbname[TSDB_TABLE_FNAME_LEN] = {0}; strncpy(tbname, p + 1, tbLen); /*tbLen = */ strdequote(tbname); @@ -250,25 +251,46 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con return code; } -static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) { +static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) { + SParseContext* pBasicCtx = pCxt->pComCxt; + if (NULL != pCxt->pMetaCache) { + return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass); + } + return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname, + AUTH_TYPE_WRITE, pPass); +} + +static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) { + SParseContext* pBasicCtx = pCxt->pComCxt; + if (NULL != pCxt->pMetaCache) { + return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta); + } + if (isStb) { + return catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, + pTableMeta); + } + return catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pTableMeta); +} + +static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) { SParseContext* pBasicCtx = pCxt->pComCxt; + if (NULL != pCxt->pMetaCache) { + return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg); + } + return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg); +} +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) { bool pass = false; - CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, - dbFname, AUTH_TYPE_WRITE, &pass)); + CHECK_CODE(checkAuth(pCxt, dbFname, &pass)); if (!pass) { return TSDB_CODE_PAR_PERMISSION_DENIED; } - if (isStb) { - CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, - &pCxt->pTableMeta)); - } else { - CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, - &pCxt->pTableMeta)); - ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0); + + CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta)); + if (!isStb) { SVgroupInfo vg; - CHECK_CODE( - catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg)); + CHECK_CODE(getTableVgroup(pCxt, name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); } return TSDB_CODE_SUCCESS; @@ -777,7 +799,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi if (errno == E2BIG) { return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name); } - + char buf[512] = {0}; snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); return buildSyntaxErrMsg(pMsgBuf, buf, value); @@ -857,10 +879,8 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName, int32_t len, STableMeta* pMeta) { - SVgroupInfo vg; - SParseContext* pBasicCtx = pCxt->pComCxt; - CHECK_CODE( - catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg)); + SVgroupInfo vg; + CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); pMeta->uid = 0; @@ -1082,9 +1102,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { - int32_t tbNum = 0; - char tbFName[TSDB_TABLE_FNAME_LEN]; - bool autoCreateTbl = false; + int32_t tbNum = 0; + char tbFName[TSDB_TABLE_FNAME_LEN]; + bool autoCreateTbl = false; // for each table while (1) { @@ -1186,8 +1206,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, - pCxt->pTableBlockHashObj); + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, + pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; @@ -1245,12 +1265,11 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } - - (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; - (*pQuery)->haveResultSet = false; - (*pQuery)->msgType = TDMT_VND_SUBMIT; - (*pQuery)->pRoot = (SNode*)context.pOutput; } + (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; + (*pQuery)->haveResultSet = false; + (*pQuery)->msgType = TDMT_VND_SUBMIT; + (*pQuery)->pRoot = (SNode*)context.pOutput; if (NULL == (*pQuery)->pTableList) { (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName)); @@ -1261,7 +1280,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { context.pOutput->payloadType = PAYLOAD_TYPE_KV; - int32_t code = skipInsertInto(&context); + int32_t code = skipInsertInto(&context.pSql, &context.msg); if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(&context); } @@ -1276,6 +1295,171 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { return code; } +typedef struct SInsertParseSyntaxCxt { + SParseContext* pComCxt; + char* pSql; + SMsgBuf msg; + SParseMetaCache* pMetaCache; +} SInsertParseSyntaxCxt; + +static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) { + SToken sToken; + while (1) { + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_NK_RP == sToken.type) { + break; + } + if (0 == sToken.n) { + return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL); + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); } + +// pSql -> (field1_value, ...) [(field1_value2, ...) ...] +static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) { + int32_t numOfRows = 0; + SToken sToken; + while (1) { + int32_t index = 0; + NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index); + if (TK_NK_LP != sToken.type) { + break; + } + pCxt->pSql += index; + + CHECK_CODE(skipParentheses(pCxt)); + ++numOfRows; + } + if (0 == numOfRows) { + return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); } + +// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) +static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { + SToken sToken; + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_NK_LP == sToken.type) { + CHECK_CODE(skipBoundColumns(pCxt)); + NEXT_TOKEN(pCxt->pSql, sToken); + } + + if (TK_TAGS != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z); + } + // pSql -> (tag1_value, ...) + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_NK_LP != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); + } + CHECK_CODE(skipTagsClause(pCxt)); + + return TSDB_CODE_SUCCESS; +} + +static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { + SName name; + CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); + CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); + CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); + return TSDB_CODE_SUCCESS; +} + +static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { + bool hasData = false; + // for each table + while (1) { + SToken sToken; + + // pSql -> tb_name ... + NEXT_TOKEN(pCxt->pSql, sToken); + + // no data in the sql string anymore. + if (sToken.n == 0) { + if (sToken.type && pCxt->pSql[0]) { + return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z); + } + + if (!hasData) { + return buildInvalidOperationMsg(&pCxt->msg, "no data in sql"); + } + break; + } + + hasData = false; + + SToken tbnameToken = sToken; + NEXT_TOKEN(pCxt->pSql, sToken); + + // USING clause + if (TK_USING == sToken.type) { + NEXT_TOKEN(pCxt->pSql, sToken); + CHECK_CODE(collectTableMetaKey(pCxt, &sToken)); + CHECK_CODE(skipUsingClause(pCxt)); + NEXT_TOKEN(pCxt->pSql, sToken); + } else { + CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken)); + } + + if (TK_NK_LP == sToken.type) { + // pSql -> field1_name, ...) + CHECK_CODE(skipBoundColumns(pCxt)); + NEXT_TOKEN(pCxt->pSql, sToken); + } + + if (TK_VALUES == sToken.type) { + // pSql -> (field1_value, ...) [(field1_value2, ...) ...] + CHECK_CODE(skipValuesClause(pCxt)); + hasData = true; + continue; + } + + // FILE csv_file_path + if (TK_FILE == sToken.type) { + // pSql -> csv_file_path + NEXT_TOKEN(pCxt->pSql, sToken); + if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) { + return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); + } + hasData = true; + continue; + } + + return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) { + SInsertParseSyntaxCxt context = {.pComCxt = pContext, + .pSql = (char*)pContext->pSql, + .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, + .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))}; + if (NULL == context.pMetaCache) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t code = skipInsertInto(&context.pSql, &context.msg); + if (TSDB_CODE_SUCCESS == code) { + code = parseInsertBodySyntax(&context); + } + if (TSDB_CODE_SUCCESS == code) { + *pQuery = taosMemoryCalloc(1, sizeof(SQuery)); + if (NULL == *pQuery) { + return TSDB_CODE_OUT_OF_MEMORY; + } + TSWAP((*pQuery)->pMetaCache, context.pMetaCache); + } + return code; +} + int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen) { SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 34b0199154..9882440bbb 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -671,22 +671,32 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet return code; } -static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) { +static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) { if (NULL == *pTables) { *pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (NULL == *pTables) { return TSDB_CODE_OUT_OF_MEMORY; } } + return taosHashPut(*pTables, pTbFName, len, &pTables, POINTER_BYTES); +} + +static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) { char fullName[TSDB_TABLE_FNAME_LEN]; int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable); - return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES); + return reserveTableReqInCacheImpl(fullName, len, pTables); } int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta); } +int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta); +} + int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -736,6 +746,12 @@ int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* p return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup); } +int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup); +} + int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -776,18 +792,30 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb return TSDB_CODE_SUCCESS; } -int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, - SParseMetaCache* pMetaCache) { +static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseMetaCache* pMetaCache) { if (NULL == pMetaCache->pUserAuth) { pMetaCache->pUserAuth = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (NULL == pMetaCache->pUserAuth) { return TSDB_CODE_OUT_OF_MEMORY; } } + bool pass = false; + return taosHashPut(pMetaCache->pUserAuth, pKey, len, &pass, sizeof(pass)); +} + +int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, + SParseMetaCache* pMetaCache) { char key[USER_AUTH_KEY_MAX_LEN] = {0}; int32_t len = userAuthToString(acctId, pUser, pDb, type, key); - bool pass = false; - return taosHashPut(pMetaCache->pUserAuth, key, len, &pass, sizeof(pass)); + return reserveUserAuthInCacheImpl(key, len, pMetaCache); +} + +int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pName, dbFName); + char key[USER_AUTH_KEY_MAX_LEN] = {0}; + int32_t len = userAuthToStringExt(pUser, dbFName, type, key); + return reserveUserAuthInCacheImpl(key, len, pMetaCache); } int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type, diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 54aa9c642c..bb70458f98 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -34,7 +34,7 @@ bool qIsInsertSql(const char* pStr, size_t length) { } while (1); } -static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) { +static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) { int32_t code = authenticate(pCxt, pQuery); if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) { @@ -54,12 +54,12 @@ static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) { static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { int32_t code = parse(pCxt, pQuery); if (TSDB_CODE_SUCCESS == code) { - code = semanticAnalysis(pCxt, *pQuery); + code = analyseSemantic(pCxt, *pQuery); } return code; } -static int32_t syntaxParseSql(SParseContext* pCxt, SQuery** pQuery) { +static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery) { int32_t code = parse(pCxt, pQuery); if (TSDB_CODE_SUCCESS == code) { code = collectMetaKey(pCxt, *pQuery); @@ -192,12 +192,12 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { return code; } -int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { +int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t code = TSDB_CODE_SUCCESS; if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { - // todo insert sql + code = parseInsertSyntax(pCxt, pQuery); } else { - code = syntaxParseSql(pCxt, pQuery); + code = parseSqlSyntax(pCxt, pQuery); } if (TSDB_CODE_SUCCESS == code) { code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq); @@ -206,13 +206,13 @@ int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq return code; } -int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, - const struct SMetaData* pMetaData, SQuery* pQuery) { +int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, + const struct SMetaData* pMetaData, SQuery* pQuery) { int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache); if (NULL == pQuery->pRoot) { - // todo insert sql + return parseInsertSql(pCxt, &pQuery); } - return semanticAnalysis(pCxt, pQuery); + return analyseSemantic(pCxt, pQuery); } void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); } diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 7fafec8882..4d313fca76 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -15,6 +15,7 @@ #include +#include "mockCatalogService.h" #include "os.h" #include "parInt.h" @@ -57,6 +58,38 @@ class InsertTest : public Test { return code_; } + int32_t runAsync() { + code_ = parseInsertSyntax(&cxt_, &res_); + if (code_ != TSDB_CODE_SUCCESS) { + cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; + return code_; + } + + SCatalogReq catalogReq = {0}; + code_ = buildCatalogReq(res_->pMetaCache, &catalogReq); + if (code_ != TSDB_CODE_SUCCESS) { + cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; + return code_; + } + + SMetaData metaData = {0}; + g_mockCatalogService->catalogGetAllMeta(&catalogReq, &metaData); + + code_ = putMetaDataToCache(&catalogReq, &metaData, res_->pMetaCache); + if (code_ != TSDB_CODE_SUCCESS) { + cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; + return code_; + } + + code_ = parseInsertSql(&cxt_, &res_); + if (code_ != TSDB_CODE_SUCCESS) { + cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; + return code_; + } + + return code_; + } + void dumpReslut() { SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); size_t num = taosArrayGetSize(pStmt->pDataBlocks); @@ -125,7 +158,7 @@ class InsertTest : public Test { SQuery* res_; }; -// INSERT INTO tb_name VALUES (field1_value, ...) +// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...) TEST_F(InsertTest, singleTableSingleRowTest) { setDatabase("root", "test"); @@ -133,6 +166,17 @@ TEST_F(InsertTest, singleTableSingleRowTest) { ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(1, 1); + + bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + + bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(1, 1); + + bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } // INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...) @@ -140,11 +184,16 @@ TEST_F(InsertTest, singleTableMultiRowTest) { setDatabase("root", "test"); bind( - "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)(now+2s, 3, 'guangzhou', 9, " - "10, 11)"); + "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)" + "(now+2s, 3, 'guangzhou', 9, 10, 11)"); ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(1, 3); + + bind( + "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)" + "(now+2s, 3, 'guangzhou', 9, 10, 11)"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) @@ -155,6 +204,9 @@ TEST_F(InsertTest, multiTableSingleRowTest) { ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(2, 1); + + bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) @@ -167,6 +219,11 @@ TEST_F(InsertTest, multiTableMultiRowTest) { ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(2, 3, 2); + + bind( + "insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")" + " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } // INSERT INTO @@ -181,6 +238,21 @@ TEST_F(InsertTest, autoCreateTableTest) { ASSERT_EQ(run(), TSDB_CODE_SUCCESS); dumpReslut(); checkReslut(1, 3); + + bind( + "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")" + "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + + bind( + "insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, " + "\"guangzhou\")"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + + bind( + "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")" + "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); + ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } TEST_F(InsertTest, toleranceTest) { @@ -190,4 +262,9 @@ TEST_F(InsertTest, toleranceTest) { ASSERT_NE(run(), TSDB_CODE_SUCCESS); bind("insert into t"); ASSERT_NE(run(), TSDB_CODE_SUCCESS); + + bind("insert into"); + ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS); + bind("insert into t"); + ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS); } -- GitLab