diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index a3de9164a2d4418dd1edc8493d46c20f2fafdcac..717278d51d1b252dc3f2bada18a61bbb65739b6e 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -53,6 +53,8 @@ typedef struct SParseContext { int8_t schemalessType; const char* svrVer; bool nodeOffline; + SArray* pTableMetaPos; // sql table pos => catalog data pos + SArray* pTableVgroupPos; // sql table pos => catalog data pos } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); @@ -84,8 +86,8 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu int32_t rowNum); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields); int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields); -int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, TAOS_MULTI_BIND* bind, - char* msgBuf, int32_t msgBufLen); +int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, + TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index acdb3b68b0a88477d1ff5f7856e533e33b49e5a7..9c086fc83e155b40505c42c8096e57b7e03a9bca 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -689,11 +689,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; if (TDMT_VND_SUBMIT == pRequest->type) { - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, res.numOfRows); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows); } - + schedulerFreeJob(&pRequest->body.queryJob, 0); } @@ -800,8 +800,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { break; } case TDMT_VND_SUBMIT: { - atomic_add_fetch_64((int64_t *)&pAppInfo->summary.insertBytes, pRes->numOfBytes); - + atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes); + code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset); break; } @@ -832,9 +832,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { if (pResult) { pRequest->body.resInfo.numOfRows = pResult->numOfRows; if (TDMT_VND_SUBMIT == pRequest->type) { - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertRows, pResult->numOfRows); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } @@ -877,14 +877,14 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue if (pQuery->pRoot) { pRequest->stmtType = pQuery->pRoot->type; } - + if (pQuery->pRoot && !pRequest->inRetry) { - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); + atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1); } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); + atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1); } } @@ -1467,9 +1467,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + STscObj* pTscObj = pRequest->pTscObj; + SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); if (pResultInfo->numOfRows == 0) { return NULL; @@ -2006,7 +2006,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, bool inEscape = false; int32_t code = 0; - void *pIter = NULL; + void* pIter = NULL; int32_t vIdx = 0; int32_t vPos[2]; diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 2249bc7823a49589e99f1714d06401b419c3d72d..308afd467f1248c14dac9d8abea638cb42444936 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -22,6 +22,7 @@ extern "C" { #include "catalog.h" #include "os.h" +#include "parser.h" #include "query.h" #define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__) @@ -44,18 +45,37 @@ typedef struct SParseTablesMetaReq { SHashObj* pTables; } SParseTablesMetaReq; +typedef enum ECatalogReqType { + CATALOG_REQ_TYPE_META = 1, + CATALOG_REQ_TYPE_VGROUP, + CATALOG_REQ_TYPE_BOTH +} ECatalogReqType; + +typedef struct SInsertTablesMetaReq { + char dbFName[TSDB_DB_FNAME_LEN]; + SArray* pTableMetaPos; + SArray* pTableMetaReq; // element is SName + SArray* pTableVgroupPos; + SArray* pTableVgroupReq; // element is SName +} SInsertTablesMetaReq; + typedef struct SParseMetaCache { - SHashObj* pTableMeta; // key is tbFName, element is STableMeta* - SHashObj* pDbVgroup; // key is dbFName, element is SArray* - SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo* - SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo* - SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* - SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass - SHashObj* pUdf; // key is funcName, element is SFuncInfo* - SHashObj* pTableIndex; // key is tbFName, element is SArray* - SHashObj* pTableCfg; // key is tbFName, element is STableCfg* - SArray* pDnodes; // element is SEpSet - bool dnodeRequired; + SHashObj* pTableMeta; // key is tbFName, element is STableMeta* + SHashObj* pDbVgroup; // key is dbFName, element is SArray* + SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo* + SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo* + SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* + SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass + SHashObj* pUdf; // key is funcName, element is SFuncInfo* + SHashObj* pTableIndex; // key is tbFName, element is SArray* + SHashObj* pTableCfg; // key is tbFName, element is STableCfg* + SArray* pDnodes; // element is SEpSet + bool dnodeRequired; + SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert + const char* pUser; + const SArray* pTableMetaData; // pRes = STableMeta* + const SArray* pTableVgroupData; // pRes = SVgroupInfo* + int32_t sqlTableNum; } SParseMetaCache; int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); @@ -72,8 +92,9 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); -int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); -int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache); +int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); +int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, + bool insertValuesStmt); int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); @@ -100,6 +121,12 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput); int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); +int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo, + SParseMetaCache* pMetaCache); +int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo, + STableMeta** pMeta); +int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo, + SVgroupInfo* pVgroup); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); #ifdef __cplusplus diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index b7532173c8380e9628fa504c9ce476f4884967fc..31ae35e7177d55397a6000a86712b977962942dd 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -73,6 +73,9 @@ typedef struct SInsertParseContext { SStmtCallback* pStmtCb; SParseMetaCache* pMetaCache; char sTableName[TSDB_TABLE_NAME_LEN]; + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; + int64_t memElapsed; + int64_t parRowElapsed; } SInsertParseContext; typedef struct SInsertParseSyntaxCxt { @@ -203,10 +206,11 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) return catalogChkAuth(pBasicCtx->pCatalog, &conn, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass); } -static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) { +static int32_t getTableSchema(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, bool isStb, + STableMeta** pTableMeta) { SParseContext* pBasicCtx = pCxt->pComCxt; if (pBasicCtx->async) { - return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta); + return getTableMetaFromCacheForInsert(pBasicCtx->pTableMetaPos, pCxt->pMetaCache, tbNo, pTableMeta); } SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, @@ -219,10 +223,10 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is return catalogGetTableMeta(pBasicCtx->pCatalog, &conn, pTbName, pTableMeta); } -static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) { +static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pTbName, SVgroupInfo* pVg) { SParseContext* pBasicCtx = pCxt->pComCxt; if (pBasicCtx->async) { - return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg); + return getTableVgroupFromCacheForInsert(pBasicCtx->pTableVgroupPos, pCxt->pMetaCache, tbNo, pVg); } SRequestConnInfo conn = {.pTrans = pBasicCtx->pTransporter, .requestId = pBasicCtx->requestId, @@ -231,28 +235,22 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroup return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg); } -static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char* dbFname, bool isStb) { - bool pass = false; - CHECK_CODE(checkAuth(pCxt, dbFname, &pass)); - if (!pass) { - return TSDB_CODE_PAR_PERMISSION_DENIED; - } - - CHECK_CODE(getTableSchema(pCxt, name, isStb, &pCxt->pTableMeta)); +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) { + CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta)); if (!isStb) { SVgroupInfo vg; - CHECK_CODE(getTableVgroup(pCxt, name, &vg)); + CHECK_CODE(getTableVgroup(pCxt, tbNo, name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); } return TSDB_CODE_SUCCESS; } -static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) { - return getTableMetaImpl(pCxt, name, dbFname, false); +static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) { + return getTableMetaImpl(pCxt, tbNo, name, dbFname, false); } -static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFname) { - return getTableMetaImpl(pCxt, name, dbFname, true); +static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) { + return getTableMetaImpl(pCxt, tbNo, name, dbFname, true); } static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) { @@ -1028,13 +1026,13 @@ end: return code; } -static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName, - int32_t len, STableMeta* pMeta) { +static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, int32_t tbNo, SName* pTableName, + const char* pName, int32_t len, STableMeta* pMeta) { SVgroupInfo vg; - CHECK_CODE(getTableVgroup(pCxt, pTableName, &vg)); + CHECK_CODE(getTableVgroup(pCxt, tbNo, pTableName, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); - pMeta->uid = 0; + pMeta->uid = tbNo; pMeta->vgId = vg.vgId; pMeta->tableType = TSDB_CHILD_TABLE; @@ -1084,7 +1082,7 @@ static int32_t ignoreAutoCreateTableClause(SInsertParseContext* pCxt) { } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) -static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) { +static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* tbFName) { int32_t len = strlen(tbFName); STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len); if (NULL != pMeta) { @@ -1102,11 +1100,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb tNameGetFullDbName(&sname, dbFName); strcpy(pCxt->sTableName, sname.tname); - CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName)); + CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } - CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta)); + CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, tbNo, name, tbFName, len, pCxt->pTableMeta)); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); @@ -1195,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, tdSRowEnd(pBuilder); *gotRow = true; - + #ifdef TD_DEBUG_PRINT_ROW STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); @@ -1214,7 +1212,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); (*numOfRows) = 0; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" + // char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" SToken sToken; while (1) { int32_t index = 0; @@ -1232,7 +1230,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo } bool gotRow = false; - CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf)); + CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, pCxt->tmpTokenBuf)); if (gotRow) { pDataBlock->size += extendedRowSize; // len; } @@ -1347,7 +1345,9 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa } static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { - taosMemoryFreeClear(pCxt->pTableMeta); + if (!pCxt->pComCxt->async) { + taosMemoryFreeClear(pCxt->pTableMeta); + } destroyBoundColumnInfo(&pCxt->tags); tdDestroySVCreateTbReq(&pCxt->createTblReq); } @@ -1365,6 +1365,20 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyBlockArrayList(pCxt->pVgDataBlocks); } +static int32_t parseTableName(SInsertParseContext* pCxt, SToken* pTbnameToken, SName* pName, char* pDbFName, + char* pTbFName) { + int32_t code = createSName(pName, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); + if (TSDB_CODE_SUCCESS == code) { + tNameExtractFullName(pName, pTbFName); + code = taosHashPut(pCxt->pTableNameHashObj, pTbFName, strlen(pTbFName), pName, sizeof(SName)); + } + if (TSDB_CODE_SUCCESS == code) { + tNameGetFullDbName(pName, pDbFName); + code = taosHashPut(pCxt->pDbFNameHashObj, pDbFName, strlen(pDbFName), pDbFName, TSDB_DB_FNAME_LEN); + } + return code; +} + // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] @@ -1372,7 +1386,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t tbNum = 0; + SName name; char tbFName[TSDB_TABLE_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; bool autoCreateTbl = false; // for each table @@ -1415,20 +1431,15 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { SToken tbnameToken = sToken; NEXT_TOKEN(pCxt->pSql, sToken); - SName name; - CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - - tNameExtractFullName(&name, tbFName); - CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(&name, dbFName); - CHECK_CODE(taosHashPut(pCxt->pDbFNameHashObj, dbFName, strlen(dbFName), dbFName, sizeof(dbFName))); + if (!pCxt->pComCxt->async || TK_USING == sToken.type) { + CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName)); + } bool existedUsing = false; // USING clause if (TK_USING == sToken.type) { existedUsing = true; - CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); + CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName)); NEXT_TOKEN(pCxt->pSql, sToken); autoCreateTbl = true; } @@ -1438,22 +1449,31 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // pSql -> field1_name, ...) pBoundColsStart = pCxt->pSql; CHECK_CODE(ignoreBoundColumns(pCxt)); - // CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); NEXT_TOKEN(pCxt->pSql, sToken); } if (TK_USING == sToken.type) { - CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); + if (pCxt->pComCxt->async) { + CHECK_CODE(parseTableName(pCxt, &tbnameToken, &name, dbFName, tbFName)); + } + CHECK_CODE(parseUsingClause(pCxt, tbNum, &name, tbFName)); NEXT_TOKEN(pCxt->pSql, sToken); autoCreateTbl = true; } else if (!existedUsing) { - CHECK_CODE(getTableMeta(pCxt, &name, dbFName)); + CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName)); } STableDataBlocks* dataBuf = NULL; - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, - &dataBuf, NULL, &pCxt->createTblReq)); + if (pCxt->pComCxt->async) { + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid), + TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), + getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, + &pCxt->createTblReq)); + } else { + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, + &dataBuf, NULL, &pCxt->createTblReq)); + } if (NULL != pBoundColsStart) { char* pCurrPos = pCxt->pSql; @@ -1532,7 +1552,9 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache .totalNum = 0, .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pStmtCb = pContext->pStmtCb, - .pMetaCache = pMetaCache}; + .pMetaCache = pMetaCache, + .memElapsed = 0, + .parRowElapsed = 0}; if (pContext->pStmtCb && *pQuery) { (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, @@ -1547,7 +1569,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache } else { context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); context.pTableBlockHashObj = - taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || @@ -1656,24 +1678,24 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { return TSDB_CODE_SUCCESS; } -static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { +static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); - CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); - CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); + CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo, + pCxt->pMetaCache)); return TSDB_CODE_SUCCESS; } -static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { +static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); + CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache)); return TSDB_CODE_SUCCESS; } static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { - bool hasData = false; + bool hasData = false; + int32_t tableNo = 0; // for each table while (1) { SToken sToken; @@ -1702,9 +1724,9 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { // USING clause if (TK_USING == sToken.type) { existedUsing = true; - CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken)); + CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken)); NEXT_TOKEN(pCxt->pSql, sToken); - CHECK_CODE(collectTableMetaKey(pCxt, &sToken)); + CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken)); CHECK_CODE(skipUsingClause(pCxt)); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -1717,15 +1739,17 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { if (TK_USING == sToken.type && !existedUsing) { existedUsing = true; - CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken)); + CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, tableNo, &tbnameToken)); NEXT_TOKEN(pCxt->pSql, sToken); - CHECK_CODE(collectTableMetaKey(pCxt, &sToken)); + CHECK_CODE(collectTableMetaKey(pCxt, true, tableNo, &sToken)); CHECK_CODE(skipUsingClause(pCxt)); NEXT_TOKEN(pCxt->pSql, sToken); - } else { - CHECK_CODE(collectTableMetaKey(pCxt, &tbnameToken)); + } else if (!existedUsing) { + CHECK_CODE(collectTableMetaKey(pCxt, false, tableNo, &tbnameToken)); } + ++tableNo; + if (TK_VALUES == sToken.type) { // pSql -> (field1_value, ...) [(field1_value2, ...) ...] CHECK_CODE(skipValuesClause(pCxt)); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index ae5a281aab92ab5e365fe19e1769d95b2b43ea47..17e78e78061b69c9eff64ad6a5802369fefaf62d 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -476,9 +476,11 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) { static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) { if (NULL != pDbsHash) { - *pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq)); if (NULL == *pDbs) { - return TSDB_CODE_OUT_OF_MEMORY; + *pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq)); + if (NULL == *pDbs) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL); while (NULL != p) { @@ -530,7 +532,62 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) { return TSDB_CODE_SUCCESS; } -int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { +static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache, + SCatalogReq* pCatalogReq) { + int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables); + pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq)); + if (NULL == pCatalogReq->pTableMeta) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq)); + if (NULL == pCatalogReq->pTableHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo)); + if (NULL == pCatalogReq->pUser) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t)); + pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t)); + + int32_t metaReqNo = 0; + int32_t vgroupReqNo = 0; + SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL); + while (NULL != p) { + STablesReq req = {0}; + strcpy(req.dbFName, p->dbFName); + TSWAP(req.pTables, p->pTableMetaReq); + taosArrayPush(pCatalogReq->pTableMeta, &req); + + req.pTables = NULL; + TSWAP(req.pTables, p->pTableVgroupReq); + taosArrayPush(pCatalogReq->pTableHash, &req); + + int32_t ntables = taosArrayGetSize(p->pTableMetaPos); + for (int32_t i = 0; i < ntables; ++i) { + taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo); + ++metaReqNo; + } + + ntables = taosArrayGetSize(p->pTableVgroupPos); + for (int32_t i = 0; i < ntables; ++i) { + taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo); + ++vgroupReqNo; + } + + SUserAuthInfo auth = {0}; + strcpy(auth.user, pCxt->pUser); + strcpy(auth.dbFName, p->dbFName); + auth.type = AUTH_TYPE_WRITE; + taosArrayPush(pCatalogReq->pUser, &auth); + + p = taosHashIterate(pMetaCache->pInsertTables, p); + } + return TSDB_CODE_SUCCESS; +} + +int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta); if (TSDB_CODE_SUCCESS == code) { code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup); @@ -560,6 +617,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog return code; } +int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { + if (NULL != pMetaCache->pInsertTables) { + return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq); + } + return buildCatalogReqForQuery(pMetaCache, pCatalogReq); +} + static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) { if (NULL == *pHash) { *pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -647,7 +711,8 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas return TSDB_CODE_SUCCESS; } -int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) { +int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, + SParseMetaCache* pMetaCache) { int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta); if (TSDB_CODE_SUCCESS == code) { code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup); @@ -677,6 +742,30 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet return code; } +int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) { + int32_t ndbs = taosArrayGetSize(pMetaData->pUser); + for (int32_t i = 0; i < ndbs; ++i) { + SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i); + if (TSDB_CODE_SUCCESS != pRes->code) { + return pRes->code; + } + if (!(*(bool*)pRes->pRes)) { + return TSDB_CODE_PAR_PERMISSION_DENIED; + } + } + pMetaCache->pTableMetaData = pMetaData->pTableMeta; + pMetaCache->pTableVgroupData = pMetaData->pTableHash; + return TSDB_CODE_SUCCESS; +} + +int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, + bool insertValuesStmt) { + if (insertValuesStmt) { + return putMetaDataToCacheForInsert(pMetaData, pMetaCache); + } + return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache); +} + static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) { if (NULL == *pTables) { *pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -977,6 +1066,82 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) { return TSDB_CODE_SUCCESS; } +static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo, + SInsertTablesMetaReq* pReq) { + switch (reqType) { + case CATALOG_REQ_TYPE_META: + taosArrayPush(pReq->pTableMetaReq, pName); + taosArrayPush(pReq->pTableMetaPos, &tableNo); + break; + case CATALOG_REQ_TYPE_VGROUP: + taosArrayPush(pReq->pTableVgroupReq, pName); + taosArrayPush(pReq->pTableVgroupPos, &tableNo); + break; + case CATALOG_REQ_TYPE_BOTH: + taosArrayPush(pReq->pTableMetaReq, pName); + taosArrayPush(pReq->pTableMetaPos, &tableNo); + taosArrayPush(pReq->pTableVgroupReq, pName); + taosArrayPush(pReq->pTableVgroupPos, &tableNo); + break; + default: + break; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo, + SHashObj* pDbs) { + SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)), + .pTableMetaPos = taosArrayInit(4, sizeof(int32_t)), + .pTableVgroupReq = taosArrayInit(4, sizeof(SName)), + .pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))}; + tNameGetFullDbName(pName, req.dbFName); + int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req); + if (TSDB_CODE_SUCCESS == code) { + code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq)); + } + return code; +} + +int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo, + SParseMetaCache* pMetaCache) { + if (NULL == pMetaCache->pInsertTables) { + pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == pMetaCache->pInsertTables) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + pMetaCache->sqlTableNum = tableNo; + SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname)); + if (NULL == pReq) { + return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables); + } + return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq); +} + +int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo, + STableMeta** pMeta) { + int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo); + SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex); + if (TSDB_CODE_SUCCESS == pRes->code) { + *pMeta = pRes->pRes; + if (NULL == *pMeta) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return pRes->code; +} + +int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo, + SVgroupInfo* pVgroup) { + int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo); + SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex); + if (TSDB_CODE_SUCCESS == pRes->code) { + memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo)); + } + return pRes->code; +} + void destoryParseTablesMetaReqHash(SHashObj* pHash) { SParseTablesMetaReq* p = taosHashIterate(pHash, NULL); while (NULL != p) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 34cd783ace5c84608de6d62ae6b994c2fbb9e3c3..7e27132f3cbc453a5cf09bd487acc75fa546ff7e 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -185,7 +185,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq code = parseSqlSyntax(pCxt, pQuery, &metaCache); } if (TSDB_CODE_SUCCESS == code) { - code = buildCatalogReq(&metaCache, pCatalogReq); + code = buildCatalogReq(pCxt, &metaCache, pCatalogReq); } destoryParseMetaCache(&metaCache, true); terrno = code; @@ -195,7 +195,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, SQuery* pQuery) { SParseMetaCache metaCache = {0}; - int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache); + int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); if (TSDB_CODE_SUCCESS == code) { if (NULL == pQuery->pRoot) { code = parseInsertSql(pCxt, &pQuery, &metaCache); diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 7302491ba7b15daca8333c4b9870eb3615e0c015..ddf15ec67bf2b77edd2e1e622aad409a9ecc0e69 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -13,21 +13,13 @@ * along with this program. If not, see . */ -#include - #include -#include "mockCatalogService.h" -#include "os.h" -#include "parInt.h" +#include "parTestUtil.h" using namespace std; -using namespace std::placeholders; -using namespace testing; -namespace { -string toString(int32_t code) { return tstrerror(code); } -} // namespace +namespace ParserTest { // syntax: // INSERT INTO @@ -36,259 +28,60 @@ string toString(int32_t code) { return tstrerror(code); } // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; -class InsertTest : public Test { - protected: - InsertTest() : res_(nullptr) {} - ~InsertTest() { reset(); } - - void setDatabase(const string& acctId, const string& db) { - acctId_ = acctId; - db_ = db; - } - - void bind(const char* sql) { - reset(); - cxt_.acctId = atoi(acctId_.c_str()); - cxt_.db = (char*)db_.c_str(); - strcpy(sqlBuf_, sql); - cxt_.sqlLen = strlen(sql); - sqlBuf_[cxt_.sqlLen] = '\0'; - cxt_.pSql = sqlBuf_; - } - - int32_t run() { - code_ = parseInsertSql(&cxt_, &res_, nullptr); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - } - return code_; - } - - int32_t runAsync() { - cxt_.async = true; - bool request = true; - unique_ptr > metaCache( - new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request))); - code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get()); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - return code_; - } - - unique_ptr catalogReq(new SCatalogReq(), - MockCatalogService::destoryCatalogReq); - code_ = buildCatalogReq(metaCache.get(), catalogReq.get()); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "buildCatalogReq code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - return code_; - } - - unique_ptr metaData(new SMetaData(), MockCatalogService::destoryMetaData); - g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get()); - - metaCache.reset(new SParseMetaCache()); - request = false; - code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - return code_; - } - - code_ = parseInsertSql(&cxt_, &res_, metaCache.get()); - if (code_ != TSDB_CODE_SUCCESS) { - cout << "parseInsertSql code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; - return code_; - } - - return code_; - } - - void dumpReslut() { - SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); - size_t num = taosArrayGetSize(pStmt->pDataBlocks); - cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType - << ", numOfVgs:" << num << endl; - for (size_t i = 0; i < num; ++i) { - SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i); - cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; - SSubmitReq* submit = (SSubmitReq*)vg->pData; - cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl; - int32_t numOfBlocks = ntohl(submit->numOfBlocks); - SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); - for (int32_t i = 0; i < numOfBlocks; ++i) { - cout << "Block:" << i << endl; - cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion) - << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen) - << ", numOfRows:" << ntohl(blk->numOfRows) << endl; - blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); - } - } - } - - void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) { - SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); - ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV); - ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); - size_t num = taosArrayGetSize(pStmt->pDataBlocks); - ASSERT_GE(num, 0); - for (size_t i = 0; i < num; ++i) { - SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i); - ASSERT_EQ(vg->numOfTables, numOfTables); - ASSERT_GE(vg->size, 0); - SSubmitReq* submit = (SSubmitReq*)vg->pData; - ASSERT_GE(ntohl(submit->length), 0); - ASSERT_GE(ntohl(submit->numOfBlocks), 0); - int32_t numOfBlocks = ntohl(submit->numOfBlocks); - SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); - for (int32_t i = 0; i < numOfBlocks; ++i) { - ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1))); - blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); - } - } - } - - private: - static const int max_err_len = 1024; - static const int max_sql_len = 1024 * 1024; - - static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { - destoryParseMetaCache(pMetaCache, request); - delete pMetaCache; - } - - void reset() { - memset(&cxt_, 0, sizeof(cxt_)); - memset(errMagBuf_, 0, max_err_len); - cxt_.pMsg = errMagBuf_; - cxt_.msgLen = max_err_len; - code_ = TSDB_CODE_SUCCESS; - qDestroyQuery(res_); - res_ = nullptr; - } - - SVnodeModifOpStmt* getVnodeModifStmt(SQuery* pQuery) { return (SVnodeModifOpStmt*)pQuery->pRoot; } - - string acctId_; - string db_; - char errMagBuf_[max_err_len]; - char sqlBuf_[max_sql_len]; - SParseContext cxt_; - int32_t code_; - SQuery* res_; -}; +class ParserInsertTest : public ParserTestBase {}; // INSERT INTO tb_name [(field1_name, ...)] VALUES (field1_value, ...) -TEST_F(InsertTest, singleTableSingleRowTest) { - setDatabase("root", "test"); - - bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(1, 1); +TEST_F(ParserInsertTest, singleTableSingleRowTest) { + useDb("root", "test"); - bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)"); - bind("insert into t1 values (now, 1, 'beijing', 3, 4, 5)"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(1, 1); - - bind("insert into t1 (ts, c1, c2, c3, c4, c5) values (now, 1, 'beijing', 3, 4, 5)"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + run("INSERT INTO t1 (ts, c1, c2, c3, c4, c5) VALUES (now, 1, 'beijing', 3, 4, 5)"); } // INSERT INTO tb_name VALUES (field1_value, ...)(field1_value, ...) -TEST_F(InsertTest, singleTableMultiRowTest) { - setDatabase("root", "test"); - - bind( - "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)" - "(now+2s, 3, 'guangzhou', 9, 10, 11)"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(1, 3); +TEST_F(ParserInsertTest, singleTableMultiRowTest) { + useDb("root", "test"); - bind( - "insert into t1 values (now, 1, 'beijing', 3, 4, 5)(now+1s, 2, 'shanghai', 6, 7, 8)" + run("INSERT INTO t1 VALUES (now, 1, 'beijing', 3, 4, 5)" + "(now+1s, 2, 'shanghai', 6, 7, 8)" "(now+2s, 3, 'guangzhou', 9, 10, 11)"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); } // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) -TEST_F(InsertTest, multiTableSingleRowTest) { - setDatabase("root", "test"); +TEST_F(ParserInsertTest, multiTableSingleRowTest) { + useDb("root", "test"); - bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(2, 1); - - bind("insert into st1s1 values (now, 1, \"beijing\") st1s2 values (now, 10, \"131028\")"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + run("INSERT INTO st1s1 VALUES (now, 1, 'beijing') st1s2 VALUES (now, 10, '131028')"); } // INSERT INTO tb1_name VALUES (field1_value, ...) tb2_name VALUES (field1_value, ...) -TEST_F(InsertTest, multiTableMultiRowTest) { - setDatabase("root", "test"); - - bind( - "insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")" - " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(2, 3, 2); +TEST_F(ParserInsertTest, multiTableMultiRowTest) { + useDb("root", "test"); - bind( - "insert into st1s1 values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")" - " st1s2 values (now, 10, \"131028\")(now+1s, 20, \"132028\")"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + run("INSERT INTO " + "st1s1 VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou') " + "st1s2 VALUES (now, 10, '131028')(now+1s, 20, '132028')"); } // INSERT INTO // tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...) // tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...) -TEST_F(InsertTest, autoCreateTableTest) { - setDatabase("root", "test"); - - bind( - "insert into st1s1 using st1 tags(1, 'wxy', now) " - "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); - dumpReslut(); - checkReslut(1, 3); +TEST_F(ParserInsertTest, autoCreateTableTest) { + useDb("root", "test"); - bind( - "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")" - "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + run("INSERT INTO st1s1 USING st1 TAGS(1, 'wxy', now) " + "VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')"); - bind( - "insert into st1s1 using st1 tags(1, 'wxy', now) " - "values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + run("INSERT INTO st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) " + "VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')"); - bind( - "insert into st1s1 using st1 (tag1, tag2) tags(1, 'wxy') values (now, 1, \"beijing\")" - "(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); - ASSERT_EQ(runAsync(), TSDB_CODE_SUCCESS); + run("INSERT INTO st1s1 (ts, c1, c2) USING st1 (tag1, tag2) TAGS(1, 'wxy') " + "VALUES (now, 1, 'beijing')(now+1s, 2, 'shanghai')(now+2s, 3, 'guangzhou')"); - bind( - "insert into st1s1 using st1 tags(1, 'wxy', now) values (now, 1, \"beijing\")" - "st1s1 using st1 tags(1, 'wxy', now) values (now+1s, 2, \"shanghai\")"); - ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + run("INSERT INTO " + "st1s1 USING st1 (tag1, tag2) TAGS(1, 'wxy') (ts, c1, c2) VALUES (now, 1, 'beijing') " + "st1s2 (ts, c1, c2) USING st1 TAGS(2, 'abc', now) VALUES (now+1s, 2, 'shanghai')"); } -TEST_F(InsertTest, toleranceTest) { - setDatabase("root", "test"); - - bind("insert into"); - ASSERT_NE(run(), TSDB_CODE_SUCCESS); - bind("insert into t"); - ASSERT_NE(run(), TSDB_CODE_SUCCESS); - - bind("insert into"); - ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS); - bind("insert into t"); - ASSERT_NE(runAsync(), TSDB_CODE_SUCCESS); -} +} // namespace ParserTest diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index 3fe4b533e44fe70e8e999ef3cacd15715cd632dd..98281b7bf070095b4bb23326b156d5e8764690de 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -225,16 +225,17 @@ class ParserTestBaseImpl { DO_WITH_THROW(collectMetaKey, pCxt, pQuery, pMetaCache); } - void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { - DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq); + void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { + DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq); } void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) { DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData); } - void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) { - DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache); + void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, + bool isInsertValues) { + DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, isInsertValues); } void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) { @@ -261,7 +262,9 @@ class ParserTestBaseImpl { void doParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq) { DO_WITH_THROW(qParseSqlSyntax, pCxt, pQuery, pCatalogReq); ASSERT_NE(*pQuery, nullptr); - res_.parsedAst_ = toString((*pQuery)->pRoot); + if (nullptr != (*pQuery)->pRoot) { + res_.parsedAst_ = toString((*pQuery)->pRoot); + } } void doAnalyseSqlSemantic(SParseContext* pCxt, const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, @@ -270,6 +273,17 @@ class ParserTestBaseImpl { res_.calcConstAst_ = toString(pQuery->pRoot); } + void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) { + DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pMetaCache); + ASSERT_NE(*pQuery, nullptr); + res_.parsedAst_ = toString((*pQuery)->pRoot); + } + + void doParseInsertSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) { + DO_WITH_THROW(parseInsertSyntax, pCxt, pQuery, pMetaCache); + ASSERT_NE(*pQuery, nullptr); + } + string toString(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; @@ -287,15 +301,20 @@ class ParserTestBaseImpl { SParseContext cxt = {0}; setParseContext(sql, &cxt); - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); - doParse(&cxt, query.get()); - SQuery* pQuery = *(query.get()); + if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + doParseInsertSql(&cxt, query.get(), nullptr); + } else { + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + doParse(&cxt, query.get()); + SQuery* pQuery = *(query.get()); - doAuthenticate(&cxt, pQuery, nullptr); + doAuthenticate(&cxt, pQuery, nullptr); - doTranslate(&cxt, pQuery, nullptr); + doTranslate(&cxt, pQuery, nullptr); - doCalculateConstant(&cxt, pQuery); + doCalculateConstant(&cxt, pQuery); + } if (g_dump) { dump(); @@ -338,17 +357,22 @@ class ParserTestBaseImpl { setParseContext(sql, &cxt, true); unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); - doParse(&cxt, query.get()); - SQuery* pQuery = *(query.get()); - - bool request = true; + bool request = true; unique_ptr > metaCache( new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); - doCollectMetaKey(&cxt, pQuery, metaCache.get()); + bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen); + if (isInsertValues) { + doParseInsertSyntax(&cxt, query.get(), metaCache.get()); + } else { + doParse(&cxt, query.get()); + doCollectMetaKey(&cxt, *(query.get()), metaCache.get()); + } + + SQuery* pQuery = *(query.get()); unique_ptr catalogReq(new SCatalogReq(), MockCatalogService::destoryCatalogReq); - doBuildCatalogReq(metaCache.get(), catalogReq.get()); + doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get()); string err; thread t1([&]() { @@ -358,13 +382,17 @@ class ParserTestBaseImpl { metaCache.reset(new SParseMetaCache()); request = false; - doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get()); + doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); - doAuthenticate(&cxt, pQuery, metaCache.get()); + if (isInsertValues) { + doParseInsertSql(&cxt, query.get(), metaCache.get()); + } else { + doAuthenticate(&cxt, pQuery, metaCache.get()); - doTranslate(&cxt, pQuery, metaCache.get()); + doTranslate(&cxt, pQuery, metaCache.get()); - doCalculateConstant(&cxt, pQuery); + doCalculateConstant(&cxt, pQuery); + } } catch (const TerminateFlag& e) { // success and terminate } catch (const runtime_error& e) {