未验证 提交 9826a071 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #13234 from taosdata/feature/3.0_async

feat: parser of insert statement adapts asynchronous interface
......@@ -55,9 +55,9 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length);
// for async mode
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery);
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery);
void qDestroyQuery(SQuery* pQueryNode);
......
......@@ -24,6 +24,7 @@ extern "C" {
#include "parUtil.h"
#include "parser.h"
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery);
......
......@@ -65,12 +65,15 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
......@@ -78,7 +81,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum);
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type,
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
......
......@@ -333,68 +333,22 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt);
case QUERY_NODE_SELECT_STMT:
return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_DROP_DATABASE_STMT:
case QUERY_NODE_ALTER_DATABASE_STMT:
break;
case QUERY_NODE_CREATE_TABLE_STMT:
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
break;
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
case QUERY_NODE_DROP_TABLE_CLAUSE:
case QUERY_NODE_DROP_TABLE_STMT:
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
break;
case QUERY_NODE_ALTER_TABLE_STMT:
return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_CREATE_USER_STMT:
case QUERY_NODE_ALTER_USER_STMT:
case QUERY_NODE_DROP_USER_STMT:
break;
case QUERY_NODE_USE_DATABASE_STMT:
return collectMetaKeyFromUseDatabase(pCxt, (SUseDatabaseStmt*)pStmt);
case QUERY_NODE_CREATE_DNODE_STMT:
case QUERY_NODE_DROP_DNODE_STMT:
case QUERY_NODE_ALTER_DNODE_STMT:
break;
case QUERY_NODE_CREATE_INDEX_STMT:
return collectMetaKeyFromCreateIndex(pCxt, (SCreateIndexStmt*)pStmt);
case QUERY_NODE_DROP_INDEX_STMT:
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
break;
case QUERY_NODE_CREATE_TOPIC_STMT:
return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt);
case QUERY_NODE_DROP_TOPIC_STMT:
case QUERY_NODE_DROP_CGROUP_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
break;
case QUERY_NODE_EXPLAIN_STMT:
return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt);
case QUERY_NODE_DESCRIBE_STMT:
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_COMPACT_STMT:
case QUERY_NODE_CREATE_FUNCTION_STMT:
case QUERY_NODE_DROP_FUNCTION_STMT:
break;
case QUERY_NODE_CREATE_STREAM_STMT:
return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt);
case QUERY_NODE_DROP_STREAM_STMT:
case QUERY_NODE_MERGE_VGROUP_STMT:
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
case QUERY_NODE_SPLIT_VGROUP_STMT:
case QUERY_NODE_SYNCDB_STMT:
case QUERY_NODE_GRANT_STMT:
case QUERY_NODE_REVOKE_STMT:
case QUERY_NODE_SHOW_DNODES_STMT:
return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_MNODES_STMT:
......@@ -407,8 +361,6 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_BNODES_STMT:
return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CLUSTER_STMT:
break;
case QUERY_NODE_SHOW_DATABASES_STMT:
return collectMetaKeyFromShowDatabases(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
......@@ -429,25 +381,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TOPICS_STMT:
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
break;
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_QUERY_STMT:
case QUERY_NODE_KILL_TRANSACTION_STMT:
default:
break;
}
......
......@@ -64,6 +64,7 @@ typedef struct SInsertParseContext {
int32_t totalNum;
SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb;
SParseMetaCache* pMetaCache;
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
......@@ -92,15 +93,15 @@ typedef struct SMemParam {
} \
} while (0)
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
NEXT_TOKEN(*pSql, sToken);
if (TK_INSERT != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z);
return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
}
NEXT_TOKEN(pCxt->pSql, sToken);
NEXT_TOKEN(*pSql, sToken);
if (TK_INTO != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z);
return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z);
}
return TSDB_CODE_SUCCESS;
}
......@@ -212,7 +213,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(tbname, p + 1, tbLen);
/*tbLen = */ strdequote(tbname);
......@@ -250,25 +251,46 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return code;
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname,
AUTH_TYPE_WRITE, pPass);
}
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
}
if (isStb) {
return catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName,
pTableMeta);
}
return catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pTableMeta);
}
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
}
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg);
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
bool pass = false;
CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser,
dbFname, AUTH_TYPE_WRITE, &pass));
CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
if (!pass) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
if (isStb) {
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
} else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
if (!isStb) {
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
CHECK_CODE(getTableVgroup(pCxt, name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
return TSDB_CODE_SUCCESS;
......@@ -777,7 +799,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi
if (errno == E2BIG) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, value);
......@@ -857,10 +879,8 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) {
SVgroupInfo vg;
SParseContext* pBasicCtx = pCxt->pComCxt;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
SVgroupInfo vg;
CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = 0;
......@@ -1082,9 +1102,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
// for each table
while (1) {
......@@ -1186,8 +1206,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
pCxt->pTableBlockHashObj);
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
......@@ -1245,12 +1265,11 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
if (NULL == (*pQuery)->pTableList) {
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
......@@ -1261,7 +1280,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context);
int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context);
}
......@@ -1276,6 +1295,171 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return code;
}
typedef struct SInsertParseSyntaxCxt {
SParseContext* pComCxt;
char* pSql;
SMsgBuf msg;
SParseMetaCache* pMetaCache;
} SInsertParseSyntaxCxt;
static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
SToken sToken;
while (1) {
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP == sToken.type) {
break;
}
if (0 == sToken.n) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) {
int32_t numOfRows = 0;
SToken sToken;
while (1) {
int32_t index = 0;
NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
if (TK_NK_LP != sToken.type) {
break;
}
pCxt->pSql += index;
CHECK_CODE(skipParentheses(pCxt));
++numOfRows;
}
if (0 == numOfRows) {
return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
}
return TSDB_CODE_SUCCESS;
}
static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }
// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_LP == sToken.type) {
CHECK_CODE(skipBoundColumns(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_TAGS != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
}
// pSql -> (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(skipTagsClause(pCxt));
return TSDB_CODE_SUCCESS;
}
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
bool hasData = false;
// for each table
while (1) {
SToken sToken;
// pSql -> tb_name ...
NEXT_TOKEN(pCxt->pSql, sToken);
// no data in the sql string anymore.
if (sToken.n == 0) {
if (sToken.type && pCxt->pSql[0]) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
}
if (!hasData) {
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
}
break;
}
hasData = false;
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
// USING clause
if (TK_USING == sToken.type) {
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
}
if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...)
CHECK_CODE(skipBoundColumns(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(skipValuesClause(pCxt));
hasData = true;
continue;
}
// FILE csv_file_path
if (TK_FILE == sToken.type) {
// pSql -> csv_file_path
NEXT_TOKEN(pCxt->pSql, sToken);
if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
}
hasData = true;
continue;
}
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
}
return TSDB_CODE_SUCCESS;
}
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) {
SInsertParseSyntaxCxt context = {.pComCxt = pContext,
.pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
if (NULL == context.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBodySyntax(&context);
}
if (TSDB_CODE_SUCCESS == code) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP((*pQuery)->pMetaCache, context.pMetaCache);
}
return code;
}
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
......
......@@ -671,22 +671,32 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
return code;
}
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return taosHashPut(*pTables, pTbFName, len, &pTables, POINTER_BYTES);
}
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES);
return reserveTableReqInCacheImpl(fullName, len, pTables);
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
}
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
......@@ -736,6 +746,12 @@ int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* p
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
}
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup);
}
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
......@@ -776,18 +792,30 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb
return TSDB_CODE_SUCCESS;
}
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pUserAuth) {
pMetaCache->pUserAuth = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pUserAuth) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
bool pass = false;
return taosHashPut(pMetaCache->pUserAuth, pKey, len, &pass, sizeof(pass));
}
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(acctId, pUser, pDb, type, key);
bool pass = false;
return taosHashPut(pMetaCache->pUserAuth, key, len, &pass, sizeof(pass));
return reserveUserAuthInCacheImpl(key, len, pMetaCache);
}
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToStringExt(pUser, dbFName, type, key);
return reserveUserAuthInCacheImpl(key, len, pMetaCache);
}
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
......
......@@ -34,7 +34,7 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1);
}
static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) {
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = authenticate(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
......@@ -54,12 +54,12 @@ static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) {
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = semanticAnalysis(pCxt, *pQuery);
code = analyseSemantic(pCxt, *pQuery);
}
return code;
}
static int32_t syntaxParseSql(SParseContext* pCxt, SQuery** pQuery) {
static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKey(pCxt, *pQuery);
......@@ -192,12 +192,12 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
return code;
}
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
// todo insert sql
code = parseInsertSyntax(pCxt, pQuery);
} else {
code = syntaxParseSql(pCxt, pQuery);
code = parseSqlSyntax(pCxt, pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq);
......@@ -206,13 +206,13 @@ int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
return code;
}
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache);
if (NULL == pQuery->pRoot) {
// todo insert sql
return parseInsertSql(pCxt, &pQuery);
}
return semanticAnalysis(pCxt, pQuery);
return analyseSemantic(pCxt, pQuery);
}
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); }
......
......@@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include "mockCatalogService.h"
#include "os.h"
#include "parInt.h"
......@@ -57,6 +58,38 @@ class InsertTest : public Test {
return code_;
}
int32_t runAsync() {
code_ = parseInsertSyntax(&cxt_, &res_);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SCatalogReq catalogReq = {0};
code_ = buildCatalogReq(res_->pMetaCache, &catalogReq);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SMetaData metaData = {0};
g_mockCatalogService->catalogGetAllMeta(&catalogReq, &metaData);
code_ = putMetaDataToCache(&catalogReq, &metaData, res_->pMetaCache);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
code_ = parseInsertSql(&cxt_, &res_);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
return code_;
}
void dumpReslut() {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks);
......@@ -125,7 +158,7 @@ class InsertTest : public Test {
SQuery* res_;
};
// INSERT INTO tb_name VALUES (field1_value, ...)
// INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
TEST_F(InsertTest, singleTableSingleRowTest) {
setDatabase("root", "test");
......@@ -133,6 +166,17 @@ TEST_F(InsertTest, singleTableSingleRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
......@@ -140,11 +184,16 @@ TEST_F(InsertTest, singleTableMultiRowTest) {
setDatabase("root", "test");
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)(now+2s, 3, 'guangzhou', 9, "
"10, 11)");
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
......@@ -155,6 +204,9 @@ TEST_F(InsertTest, multiTableSingleRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 1);
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
......@@ -167,6 +219,11 @@ TEST_F(InsertTest, multiTableMultiRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(2, 3, 2);
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
// INSERT INTO
......@@ -181,6 +238,21 @@ TEST_F(InsertTest, autoCreateTableTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 3);
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
bind(
"insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, "
"\"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
}
TEST_F(InsertTest, toleranceTest) {
......@@ -190,4 +262,9 @@ TEST_F(InsertTest, toleranceTest) {
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册