提交 06baa861 编写于 作者: X Xiaoyu Wang

feat: parser of insert statement adapts asynchronous interface

上级 882f4f38
...@@ -55,9 +55,9 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); ...@@ -55,9 +55,9 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length); bool qIsInsertSql(const char* pStr, size_t length);
// for async mode // for async mode
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery); const struct SMetaData* pMetaData, SQuery* pQuery);
void qDestroyQuery(SQuery* pQueryNode); void qDestroyQuery(SQuery* pQueryNode);
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
#include "parUtil.h" #include "parUtil.h"
#include "parser.h" #include "parser.h"
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery); int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery);
......
...@@ -65,12 +65,15 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); ...@@ -65,12 +65,15 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache); int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache); SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache); int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo); int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
...@@ -78,7 +81,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, ...@@ -78,7 +81,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum); int32_t* pTableNum);
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo); int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type, int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass); bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo); int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
......
...@@ -333,68 +333,22 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -333,68 +333,22 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt); return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt);
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt); return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_DROP_DATABASE_STMT:
case QUERY_NODE_ALTER_DATABASE_STMT:
break;
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
break;
case QUERY_NODE_CREATE_MULTI_TABLE_STMT: case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
case QUERY_NODE_DROP_TABLE_CLAUSE:
case QUERY_NODE_DROP_TABLE_STMT:
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
break;
case QUERY_NODE_ALTER_TABLE_STMT: case QUERY_NODE_ALTER_TABLE_STMT:
return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_CREATE_USER_STMT:
case QUERY_NODE_ALTER_USER_STMT:
case QUERY_NODE_DROP_USER_STMT:
break;
case QUERY_NODE_USE_DATABASE_STMT: case QUERY_NODE_USE_DATABASE_STMT:
return collectMetaKeyFromUseDatabase(pCxt, (SUseDatabaseStmt*)pStmt); return collectMetaKeyFromUseDatabase(pCxt, (SUseDatabaseStmt*)pStmt);
case QUERY_NODE_CREATE_DNODE_STMT:
case QUERY_NODE_DROP_DNODE_STMT:
case QUERY_NODE_ALTER_DNODE_STMT:
break;
case QUERY_NODE_CREATE_INDEX_STMT: case QUERY_NODE_CREATE_INDEX_STMT:
return collectMetaKeyFromCreateIndex(pCxt, (SCreateIndexStmt*)pStmt); return collectMetaKeyFromCreateIndex(pCxt, (SCreateIndexStmt*)pStmt);
case QUERY_NODE_DROP_INDEX_STMT:
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
break;
case QUERY_NODE_CREATE_TOPIC_STMT: case QUERY_NODE_CREATE_TOPIC_STMT:
return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt); return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt);
case QUERY_NODE_DROP_TOPIC_STMT:
case QUERY_NODE_DROP_CGROUP_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
break;
case QUERY_NODE_EXPLAIN_STMT: case QUERY_NODE_EXPLAIN_STMT:
return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt); return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt);
case QUERY_NODE_DESCRIBE_STMT:
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_COMPACT_STMT:
case QUERY_NODE_CREATE_FUNCTION_STMT:
case QUERY_NODE_DROP_FUNCTION_STMT:
break;
case QUERY_NODE_CREATE_STREAM_STMT: case QUERY_NODE_CREATE_STREAM_STMT:
return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt); return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt);
case QUERY_NODE_DROP_STREAM_STMT:
case QUERY_NODE_MERGE_VGROUP_STMT:
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
case QUERY_NODE_SPLIT_VGROUP_STMT:
case QUERY_NODE_SYNCDB_STMT:
case QUERY_NODE_GRANT_STMT:
case QUERY_NODE_REVOKE_STMT:
case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_DNODES_STMT:
return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT:
...@@ -407,8 +361,6 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -407,8 +361,6 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT:
return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CLUSTER_STMT:
break;
case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_SHOW_DATABASES_STMT:
return collectMetaKeyFromShowDatabases(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowDatabases(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_FUNCTIONS_STMT: case QUERY_NODE_SHOW_FUNCTIONS_STMT:
...@@ -429,25 +381,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -429,25 +381,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TOPICS_STMT:
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
break;
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_QUERY_STMT:
case QUERY_NODE_KILL_TRANSACTION_STMT:
default: default:
break; break;
} }
......
...@@ -64,6 +64,7 @@ typedef struct SInsertParseContext { ...@@ -64,6 +64,7 @@ typedef struct SInsertParseContext {
int32_t totalNum; int32_t totalNum;
SVnodeModifOpStmt* pOutput; SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb; SStmtCallback* pStmtCb;
SParseMetaCache* pMetaCache;
} SInsertParseContext; } SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
...@@ -92,15 +93,15 @@ typedef struct SMemParam { ...@@ -92,15 +93,15 @@ typedef struct SMemParam {
} \ } \
} while (0) } while (0)
static int32_t skipInsertInto(SInsertParseContext* pCxt) { static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
SToken sToken; SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(*pSql, sToken);
if (TK_INSERT != sToken.type) { if (TK_INSERT != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INSERT is expected", sToken.z); return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
} }
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(*pSql, sToken);
if (TK_INTO != sToken.type) { if (TK_INTO != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "keyword INTO is expected", sToken.z); return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", sToken.z);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -212,7 +213,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con ...@@ -212,7 +213,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return buildInvalidOperationMsg(pMsgBuf, msg4); return buildInvalidOperationMsg(pMsgBuf, msg4);
} }
char tbname[TSDB_TABLE_FNAME_LEN] = {0}; char tbname[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(tbname, p + 1, tbLen); strncpy(tbname, p + 1, tbLen);
/*tbLen = */ strdequote(tbname); /*tbLen = */ strdequote(tbname);
...@@ -250,25 +251,46 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con ...@@ -250,25 +251,46 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return code; return code;
} }
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) { static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname,
AUTH_TYPE_WRITE, pPass);
}
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
}
if (isStb) {
return catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName,
pTableMeta);
}
return catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pTableMeta);
}
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
SParseContext* pBasicCtx = pCxt->pComCxt; SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
}
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg);
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) {
bool pass = false; bool pass = false;
CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, CHECK_CODE(checkAuth(pCxt, dbFname, &pass));
dbFname, AUTH_TYPE_WRITE, &pass));
if (!pass) { if (!pass) {
return TSDB_CODE_PAR_PERMISSION_DENIED; return TSDB_CODE_PAR_PERMISSION_DENIED;
} }
if (isStb) {
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta));
&pCxt->pTableMeta)); if (!isStb) {
} else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
ASSERT(pCxt->pTableMeta->tableInfo.rowSize > 0);
SVgroupInfo vg; SVgroupInfo vg;
CHECK_CODE( CHECK_CODE(getTableVgroup(pCxt, name, &vg));
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -777,7 +799,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi ...@@ -777,7 +799,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi
if (errno == E2BIG) { if (errno == E2BIG) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name); return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
} }
char buf[512] = {0}; char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, value); return buildSyntaxErrMsg(pMsgBuf, buf, value);
...@@ -857,10 +879,8 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { ...@@ -857,10 +879,8 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName, static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) { int32_t len, STableMeta* pMeta) {
SVgroupInfo vg; SVgroupInfo vg;
SParseContext* pBasicCtx = pCxt->pComCxt; CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg));
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = 0; pMeta->uid = 0;
...@@ -1082,9 +1102,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { ...@@ -1082,9 +1102,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...]; // [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) { static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0; int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false; bool autoCreateTbl = false;
// for each table // for each table
while (1) { while (1) {
...@@ -1186,8 +1206,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1186,8 +1206,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
pCxt->pTableBlockHashObj); pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags)); memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL; pCxt->pVgroupsHashObj = NULL;
...@@ -1245,12 +1265,11 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1245,12 +1265,11 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
} }
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
if (NULL == (*pQuery)->pTableList) { if (NULL == (*pQuery)->pTableList) {
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName)); (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
...@@ -1261,7 +1280,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1261,7 +1280,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
context.pOutput->payloadType = PAYLOAD_TYPE_KV; context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context); int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context); code = parseInsertBody(&context);
} }
...@@ -1276,6 +1295,171 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1276,6 +1295,171 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return code; return code;
} }
typedef struct SInsertParseSyntaxCxt {
SParseContext* pComCxt;
char* pSql;
SMsgBuf msg;
SParseMetaCache* pMetaCache;
} SInsertParseSyntaxCxt;
static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
SToken sToken;
while (1) {
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP == sToken.type) {
break;
}
if (0 == sToken.n) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t skipBoundColumns(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t skipValuesClause(SInsertParseSyntaxCxt* pCxt) {
int32_t numOfRows = 0;
SToken sToken;
while (1) {
int32_t index = 0;
NEXT_TOKEN_KEEP_SQL(pCxt->pSql, sToken, index);
if (TK_NK_LP != sToken.type) {
break;
}
pCxt->pSql += index;
CHECK_CODE(skipParentheses(pCxt));
++numOfRows;
}
if (0 == numOfRows) {
return buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
}
return TSDB_CODE_SUCCESS;
}
static int32_t skipTagsClause(SInsertParseSyntaxCxt* pCxt) { return skipParentheses(pCxt); }
// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
SToken sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_LP == sToken.type) {
CHECK_CODE(skipBoundColumns(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_TAGS != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", sToken.z);
}
// pSql -> (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(skipTagsClause(pCxt));
return TSDB_CODE_SUCCESS;
}
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
bool hasData = false;
// for each table
while (1) {
SToken sToken;
// pSql -> tb_name ...
NEXT_TOKEN(pCxt->pSql, sToken);
// no data in the sql string anymore.
if (sToken.n == 0) {
if (sToken.type && pCxt->pSql[0]) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
}
if (!hasData) {
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
}
break;
}
hasData = false;
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
// USING clause
if (TK_USING == sToken.type) {
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken));
}
if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...)
CHECK_CODE(skipBoundColumns(pCxt));
NEXT_TOKEN(pCxt->pSql, sToken);
}
if (TK_VALUES == sToken.type) {
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
CHECK_CODE(skipValuesClause(pCxt));
hasData = true;
continue;
}
// FILE csv_file_path
if (TK_FILE == sToken.type) {
// pSql -> csv_file_path
NEXT_TOKEN(pCxt->pSql, sToken);
if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
}
hasData = true;
continue;
}
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
}
return TSDB_CODE_SUCCESS;
}
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery) {
SInsertParseSyntaxCxt context = {.pComCxt = pContext,
.pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
if (NULL == context.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = skipInsertInto(&context.pSql, &context.msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBodySyntax(&context);
}
if (TSDB_CODE_SUCCESS == code) {
*pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP((*pQuery)->pMetaCache, context.pMetaCache);
}
return code;
}
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen) { int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
......
...@@ -671,22 +671,32 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet ...@@ -671,22 +671,32 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
return code; return code;
} }
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) { static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
if (NULL == *pTables) { if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); *pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pTables) { if (NULL == *pTables) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
return taosHashPut(*pTables, pTbFName, len, &pTables, POINTER_BYTES);
}
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
char fullName[TSDB_TABLE_FNAME_LEN]; char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable); int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES); return reserveTableReqInCacheImpl(fullName, len, pTables);
} }
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta); return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
} }
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN]; char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName); tNameExtractFullName(pName, fullName);
...@@ -736,6 +746,12 @@ int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* p ...@@ -736,6 +746,12 @@ int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* p
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup); return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
} }
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup);
}
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) { int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
char fullName[TSDB_TABLE_FNAME_LEN]; char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName); tNameExtractFullName(pName, fullName);
...@@ -776,18 +792,30 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb ...@@ -776,18 +792,30 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseMetaCache* pMetaCache) {
SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pUserAuth) { if (NULL == pMetaCache->pUserAuth) {
pMetaCache->pUserAuth = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pMetaCache->pUserAuth = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pUserAuth) { if (NULL == pMetaCache->pUserAuth) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
bool pass = false;
return taosHashPut(pMetaCache->pUserAuth, pKey, len, &pass, sizeof(pass));
}
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
char key[USER_AUTH_KEY_MAX_LEN] = {0}; char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(acctId, pUser, pDb, type, key); int32_t len = userAuthToString(acctId, pUser, pDb, type, key);
bool pass = false; return reserveUserAuthInCacheImpl(key, len, pMetaCache);
return taosHashPut(pMetaCache->pUserAuth, key, len, &pass, sizeof(pass)); }
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToStringExt(pUser, dbFName, type, key);
return reserveUserAuthInCacheImpl(key, len, pMetaCache);
} }
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type, int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
......
...@@ -34,7 +34,7 @@ bool qIsInsertSql(const char* pStr, size_t length) { ...@@ -34,7 +34,7 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1); } while (1);
} }
static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) { static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery) {
int32_t code = authenticate(pCxt, pQuery); int32_t code = authenticate(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) { if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
...@@ -54,12 +54,12 @@ static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) { ...@@ -54,12 +54,12 @@ static int32_t semanticAnalysis(SParseContext* pCxt, SQuery* pQuery) {
static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery); int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = semanticAnalysis(pCxt, *pQuery); code = analyseSemantic(pCxt, *pQuery);
} }
return code; return code;
} }
static int32_t syntaxParseSql(SParseContext* pCxt, SQuery** pQuery) { static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery); int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKey(pCxt, *pQuery); code = collectMetaKey(pCxt, *pQuery);
...@@ -192,12 +192,12 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -192,12 +192,12 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
return code; return code;
} }
int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
// todo insert sql code = parseInsertSyntax(pCxt, pQuery);
} else { } else {
code = syntaxParseSql(pCxt, pQuery); code = parseSqlSyntax(pCxt, pQuery);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq); code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq);
...@@ -206,13 +206,13 @@ int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq ...@@ -206,13 +206,13 @@ int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
return code; return code;
} }
int32_t qSemanticAnalysisSql(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) { const struct SMetaData* pMetaData, SQuery* pQuery) {
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache); int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, pQuery->pMetaCache);
if (NULL == pQuery->pRoot) { if (NULL == pQuery->pRoot) {
// todo insert sql return parseInsertSql(pCxt, &pQuery);
} }
return semanticAnalysis(pCxt, pQuery); return analyseSemantic(pCxt, pQuery);
} }
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); } void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "mockCatalogService.h"
#include "os.h" #include "os.h"
#include "parInt.h" #include "parInt.h"
...@@ -57,6 +58,38 @@ class InsertTest : public Test { ...@@ -57,6 +58,38 @@ class InsertTest : public Test {
return code_; return code_;
} }
int32_t runAsync() {
code_ = parseInsertSyntax(&cxt_, &res_);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SCatalogReq catalogReq = {0};
code_ = buildCatalogReq(res_->pMetaCache, &catalogReq);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
SMetaData metaData = {0};
g_mockCatalogService->catalogGetAllMeta(&catalogReq, &metaData);
code_ = putMetaDataToCache(&catalogReq, &metaData, res_->pMetaCache);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
code_ = parseInsertSql(&cxt_, &res_);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
return code_;
}
return code_;
}
void dumpReslut() { void dumpReslut() {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
size_t num = taosArrayGetSize(pStmt->pDataBlocks); size_t num = taosArrayGetSize(pStmt->pDataBlocks);
...@@ -125,7 +158,7 @@ class InsertTest : public Test { ...@@ -125,7 +158,7 @@ class InsertTest : public Test {
SQuery* res_; SQuery* res_;
}; };
// INSERT INTO tb_name VALUES (field1_value, ...) // INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...)
TEST_F(InsertTest, singleTableSingleRowTest) { TEST_F(InsertTest, singleTableSingleRowTest) {
setDatabase("root", "test"); setDatabase("root", "test");
...@@ -133,6 +166,17 @@ TEST_F(InsertTest, singleTableSingleRowTest) { ...@@ -133,6 +166,17 @@ TEST_F(InsertTest, singleTableSingleRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut(); dumpReslut();
checkReslut(1, 1); checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
dumpReslut();
checkReslut(1, 1);
bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
} }
// INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...) // INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...)
...@@ -140,11 +184,16 @@ TEST_F(InsertTest, singleTableMultiRowTest) { ...@@ -140,11 +184,16 @@ TEST_F(InsertTest, singleTableMultiRowTest) {
setDatabase("root", "test"); setDatabase("root", "test");
bind( bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)(now+2s, 3, 'guangzhou', 9, " "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"10, 11)"); "(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut(); dumpReslut();
checkReslut(1, 3); checkReslut(1, 3);
bind(
"insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)"
"(now+2s, 3, 'guangzhou', 9, 10, 11)");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
} }
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
...@@ -155,6 +204,9 @@ TEST_F(InsertTest, multiTableSingleRowTest) { ...@@ -155,6 +204,9 @@ TEST_F(InsertTest, multiTableSingleRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut(); dumpReslut();
checkReslut(2, 1); checkReslut(2, 1);
bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
} }
// INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...)
...@@ -167,6 +219,11 @@ TEST_F(InsertTest, multiTableMultiRowTest) { ...@@ -167,6 +219,11 @@ TEST_F(InsertTest, multiTableMultiRowTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut(); dumpReslut();
checkReslut(2, 3, 2); checkReslut(2, 3, 2);
bind(
"insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"
" st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
} }
// INSERT INTO // INSERT INTO
...@@ -181,6 +238,21 @@ TEST_F(InsertTest, autoCreateTableTest) { ...@@ -181,6 +238,21 @@ TEST_F(InsertTest, autoCreateTableTest) {
ASSERT_EQ(run(), TSDB_CODE_SUCCESS); ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
dumpReslut(); dumpReslut();
checkReslut(1, 3); checkReslut(1, 3);
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
bind(
"insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, "
"\"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
bind(
"insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")"
"(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")");
ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS);
} }
TEST_F(InsertTest, toleranceTest) { TEST_F(InsertTest, toleranceTest) {
...@@ -190,4 +262,9 @@ TEST_F(InsertTest, toleranceTest) { ...@@ -190,4 +262,9 @@ TEST_F(InsertTest, toleranceTest) {
ASSERT_NE(run(), TSDB_CODE_SUCCESS); ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into t"); bind("insert into t");
ASSERT_NE(run(), TSDB_CODE_SUCCESS); ASSERT_NE(run(), TSDB_CODE_SUCCESS);
bind("insert into");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
bind("insert into t");
ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册