diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 367fde355cd95bd38c14040704d8094cd60b9fa0..a32ebc86941ae5c641ff277b36d558695d21d4de 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -931,7 +931,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { pRequest->code = code1; } - if (pRequest->code == TSDB_CODE_SUCCESS && incompletaFileParsing(pRequest->pQuery->pRoot)) { + if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery && + incompletaFileParsing(pRequest->pQuery->pRoot)) { continueInsertFromCsv(pWrapper, pRequest); return; } @@ -1057,7 +1058,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat } if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { SArray* pNodeList = NULL; - if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pRequest->pQuery->pRoot)) { + if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pQuery->pRoot)) { buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index f747fdc2c965f18132a7dafc837ef51192f25dc0..09d55d369fc0d711b4a4301c74a0cb0d7002c347 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -138,7 +138,7 @@ void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag SArray *tagName, uint8_t tagNum); int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); -int32_t insBuildOutput(SVnodeModifOpStmt *pStmt); +int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyDataBlock(STableDataBlocks *pDataBlock); #endif // TDENGINE_PAR_INSERT_UTIL_H diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index ce75e4ff8a71ed494e4ea0e9ec55b52d0b80fb98..fe12332840baab723bdeaeeb8edc5c45fbe12db8 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1448,8 +1448,19 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod return code; } +static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { + destroyBoundColumnInfo(&pCxt->tags); + taosMemoryFreeClear(pStmt->pTableMeta); + tdDestroySVCreateTbReq(&pStmt->createTblReq); + pCxt->missCache = false; + pCxt->usingDuplicateTable = false; + pStmt->usingTableProcessing = false; + pStmt->fileProcessing = false; +} + // input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ... static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) { + resetEnvPreTable(pCxt, pStmt); int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = parseInsertTableClauseBottom(pCxt, pStmt); @@ -1506,11 +1517,8 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); - char stbFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&pStmt->usingTableName, stbFName); - int32_t code = - (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, '\0' != pStmt->usingTableName.tname[0], - pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, stbFName); + int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, pStmt->usingTableProcessing, + pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pStmt->pVgroupsHashObj = NULL; @@ -1529,21 +1537,11 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStm code = insMergeTableDataBlocks(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); } if (TSDB_CODE_SUCCESS == code) { - code = insBuildOutput(pStmt); + code = insBuildOutput(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); } return code; } -static void destroyEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - destroyBoundColumnInfo(&pCxt->tags); - taosMemoryFreeClear(pStmt->pTableMeta); - tdDestroySVCreateTbReq(&pStmt->createTblReq); - pCxt->missCache = false; - pCxt->usingDuplicateTable = false; - pStmt->usingTableProcessing = false; - pStmt->fileProcessing = false; -} - // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] @@ -1555,7 +1553,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt bool hasData = true; // for each table while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) { - destroyEnvPreTable(pCxt, pStmt); // pStmt->pSql -> tb_name ... NEXT_TOKEN(pStmt->pSql, token); code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 73557c067e08a3f50df2b0ce05ebf223d7ccb1cf..9a5f349d8fe1f9da5bd2290832b3d5ab61cb7688 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -30,18 +30,17 @@ typedef struct SKvParam { } SKvParam; int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { - SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot; - int32_t code = 0; - + int32_t code = TSDB_CODE_SUCCESS; + SArray* pVgDataBlocks = NULL; // merge according to vgId if (taosHashGetSize(pBlockHash) > 0) { - CHECK_CODE(insMergeTableDataBlocks(pBlockHash, &modifyNode->pVgDataBlocks)); + code = insMergeTableDataBlocks(pBlockHash, &pVgDataBlocks); } - - CHECK_CODE(insBuildOutput(modifyNode)); - - insDestroyBlockArrayList(modifyNode->pVgDataBlocks); - return TSDB_CODE_SUCCESS; + if (TSDB_CODE_SUCCESS == code) { + code = insBuildOutput(pVgHash, pVgDataBlocks, &((SVnodeModifOpStmt*)pQuery->pRoot)->pDataBlocks); + } + insDestroyBlockArrayList(pVgDataBlocks); + return code; } int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index a0a24d3d31bb57e48e358a5a73caa926adf23c40..bc09163753d91e883d4e0f176bb97733cfca2c33 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -832,6 +832,10 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c } } + if (NULL != strchr(pName->tname, '.')) { + code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'"); + } + return code; } @@ -930,24 +934,24 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { } } -int32_t insBuildOutput(SVnodeModifOpStmt* pStmt) { - size_t numOfVg = taosArrayGetSize(pStmt->pVgDataBlocks); - pStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); - if (NULL == pStmt->pDataBlocks) { +int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray** pDataBlocks) { + size_t numOfVg = taosArrayGetSize(pVgDataBlocks); + *pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + if (NULL == *pDataBlocks) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } for (size_t i = 0; i < numOfVg; ++i) { - STableDataBlocks* src = taosArrayGetP(pStmt->pVgDataBlocks, i); + STableDataBlocks* src = taosArrayGetP(pVgDataBlocks, i); SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); if (NULL == dst) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - taosHashGetDup(pStmt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); + taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); dst->numOfTables = src->numOfTables; dst->size = src->size; TSWAP(dst->pData, src->pData); buildMsgHeader(src, dst); - taosArrayPush(pStmt->pDataBlocks, &dst); + taosArrayPush(*pDataBlocks, &dst); } return TSDB_CODE_SUCCESS; } diff --git a/tests/system-test/output.txt b/tests/system-test/output.txt new file mode 100644 index 0000000000000000000000000000000000000000..df83ae9acbeefc9d3103eb0978533aa58fee8231 --- /dev/null +++ b/tests/system-test/output.txt @@ -0,0 +1,48 @@ +[11/06 18:32:29.554116] INFO: start creating 100 table(s) with 8 thread(s) +[11/06 18:32:29.649584] SUCC: Spent 0.0950 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created +[11/06 18:32:29.687312] SUCC: thread[3] completed total inserted rows: 1300, 48030.74 records/second +[11/06 18:32:29.690040] SUCC: thread[0] completed total inserted rows: 1300, 43537.96 records/second +[11/06 18:32:29.691061] SUCC: thread[5] completed total inserted rows: 1200, 39417.93 records/second +[11/06 18:32:29.708308] SUCC: thread[4] completed total inserted rows: 1200, 26671.41 records/second +[11/06 18:32:29.709035] SUCC: thread[6] completed total inserted rows: 1200, 26515.82 records/second +[11/06 18:32:29.709355] SUCC: thread[2] completed total inserted rows: 1300, 27370.73 records/second +[11/06 18:32:29.711754] SUCC: thread[7] completed total inserted rows: 1200, 25266.35 records/second +[11/06 18:32:29.713149] SUCC: thread[1] completed total inserted rows: 1300, 24847.57 records/second +[11/06 18:32:29.714356] SUCC: Spent 0.052822 seconds to insert rows: 10000 with 8 thread(s) into test 189315.06 records/second +[11/06 18:32:29.714381] SUCC: insert delay, min: 0.78ms, avg: 3.25ms, p90: 7.37ms, p95: 10.48ms, p99: 11.98ms, max: 11.98ms +[11/06 18:32:58.075364] INFO: start creating 100 table(s) with 8 thread(s) +[11/06 18:32:58.140297] SUCC: Spent 0.0650 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created +[11/06 18:32:58.212640] SUCC: thread[2] completed total inserted rows: 13000, 214904.45 records/second +[11/06 18:32:58.235314] SUCC: thread[5] completed total inserted rows: 12000, 144468.66 records/second +[11/06 18:32:58.238604] SUCC: thread[4] completed total inserted rows: 12000, 142203.68 records/second +[11/06 18:32:58.239432] SUCC: thread[3] completed total inserted rows: 13000, 148244.44 records/second +[11/06 18:32:58.240457] SUCC: thread[1] completed total inserted rows: 13000, 146629.22 records/second +[11/06 18:32:58.242997] SUCC: thread[6] completed total inserted rows: 12000, 135484.53 records/second +[11/06 18:32:58.244407] SUCC: thread[0] completed total inserted rows: 13000, 139774.43 records/second +[11/06 18:32:58.244679] SUCC: thread[7] completed total inserted rows: 12000, 133324.45 records/second +[11/06 18:32:58.245536] SUCC: Spent 0.093401 seconds to insert rows: 100000 with 8 thread(s) into test 1070652.35 records/second +[11/06 18:32:58.245574] SUCC: insert delay, min: 1.87ms, avg: 6.76ms, p90: 11.67ms, p95: 16.21ms, p99: 20.30ms, max: 20.30ms +[11/06 18:41:03.922840] INFO: start creating 100 table(s) with 8 thread(s) +[11/06 18:41:03.987301] SUCC: Spent 0.0650 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created +[11/06 18:41:04.035594] SUCC: thread[0] completed total inserted rows: 1300, 34073.34 records/second +[11/06 18:41:04.036945] SUCC: thread[2] completed total inserted rows: 1300, 33197.99 records/second +[11/06 18:41:04.043375] SUCC: thread[5] completed total inserted rows: 1200, 27713.63 records/second +[11/06 18:41:04.054865] SUCC: thread[4] completed total inserted rows: 1200, 21176.72 records/second +[11/06 18:41:04.056026] SUCC: thread[7] completed total inserted rows: 1200, 23637.87 records/second +[11/06 18:41:04.056954] SUCC: thread[6] completed total inserted rows: 1200, 20847.45 records/second +[11/06 18:41:04.058160] SUCC: thread[3] completed total inserted rows: 1300, 21564.24 records/second +[11/06 18:41:04.060392] SUCC: thread[1] completed total inserted rows: 1300, 20688.12 records/second +[11/06 18:41:04.061369] SUCC: Spent 0.063170 seconds to insert rows: 10000 with 8 thread(s) into test 158302.99 records/second +[11/06 18:41:04.061385] SUCC: insert delay, min: 0.62ms, avg: 4.09ms, p90: 13.44ms, p95: 19.25ms, p99: 29.64ms, max: 29.64ms +[11/06 18:41:34.386476] INFO: start creating 100 table(s) with 8 thread(s) +[11/06 18:41:34.465414] SUCC: Spent 0.0790 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created +[11/06 18:41:34.546439] SUCC: thread[2] completed total inserted rows: 13000, 186091.79 records/second +[11/06 18:41:34.549390] SUCC: thread[4] completed total inserted rows: 12000, 164774.05 records/second +[11/06 18:41:34.556956] SUCC: thread[5] completed total inserted rows: 12000, 153440.91 records/second +[11/06 18:41:34.563860] SUCC: thread[6] completed total inserted rows: 12000, 138154.94 records/second +[11/06 18:41:34.564020] SUCC: thread[0] completed total inserted rows: 13000, 148469.62 records/second +[11/06 18:41:34.564693] SUCC: thread[3] completed total inserted rows: 13000, 149214.33 records/second +[11/06 18:41:34.571055] SUCC: thread[1] completed total inserted rows: 13000, 140452.47 records/second +[11/06 18:41:34.571921] SUCC: thread[7] completed total inserted rows: 12000, 129424.71 records/second +[11/06 18:41:34.572778] SUCC: Spent 0.095759 seconds to insert rows: 100000 with 8 thread(s) into test 1044288.27 records/second +[11/06 18:41:34.572797] SUCC: insert delay, min: 1.77ms, avg: 6.68ms, p90: 10.72ms, p95: 14.14ms, p99: 17.69ms, max: 17.69ms