提交 b7bc8281 编写于 作者: X Xiaoyu Wang

enh: insert optimize

上级 ac2ce214
......@@ -931,7 +931,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest->code = code1;
}
if (pRequest->code == TSDB_CODE_SUCCESS && incompletaFileParsing(pRequest->pQuery->pRoot)) {
if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
incompletaFileParsing(pRequest->pQuery->pRoot)) {
continueInsertFromCsv(pWrapper, pRequest);
return;
}
......@@ -1057,7 +1058,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
}
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pRequest->pQuery->pRoot)) {
if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pQuery->pRoot)) {
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
}
......
......@@ -138,7 +138,7 @@ void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag
SArray *tagName, uint8_t tagNum);
int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
int32_t insBuildOutput(SVnodeModifOpStmt *pStmt);
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
#endif // TDENGINE_PAR_INSERT_UTIL_H
......@@ -1448,8 +1448,19 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
return code;
}
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
destroyBoundColumnInfo(&pCxt->tags);
taosMemoryFreeClear(pStmt->pTableMeta);
tdDestroySVCreateTbReq(&pStmt->createTblReq);
pCxt->missCache = false;
pCxt->usingDuplicateTable = false;
pStmt->usingTableProcessing = false;
pStmt->fileProcessing = false;
}
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) {
resetEnvPreTable(pCxt, pStmt);
int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = parseInsertTableClauseBottom(pCxt, pStmt);
......@@ -1506,11 +1517,8 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStmt->targetTableName, tbFName);
char stbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStmt->usingTableName, stbFName);
int32_t code =
(*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, '\0' != pStmt->usingTableName.tname[0],
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, stbFName);
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, pStmt->usingTableProcessing,
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pStmt->pVgroupsHashObj = NULL;
......@@ -1529,21 +1537,11 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStm
code = insMergeTableDataBlocks(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
}
if (TSDB_CODE_SUCCESS == code) {
code = insBuildOutput(pStmt);
code = insBuildOutput(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
}
return code;
}
static void destroyEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
destroyBoundColumnInfo(&pCxt->tags);
taosMemoryFreeClear(pStmt->pTableMeta);
tdDestroySVCreateTbReq(&pStmt->createTblReq);
pCxt->missCache = false;
pCxt->usingDuplicateTable = false;
pStmt->usingTableProcessing = false;
pStmt->fileProcessing = false;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
......@@ -1555,7 +1553,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
bool hasData = true;
// for each table
while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) {
destroyEnvPreTable(pCxt, pStmt);
// pStmt->pSql -> tb_name ...
NEXT_TOKEN(pStmt->pSql, token);
code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData);
......
......@@ -30,18 +30,17 @@ typedef struct SKvParam {
} SKvParam;
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
SArray* pVgDataBlocks = NULL;
// merge according to vgId
if (taosHashGetSize(pBlockHash) > 0) {
CHECK_CODE(insMergeTableDataBlocks(pBlockHash, &modifyNode->pVgDataBlocks));
code = insMergeTableDataBlocks(pBlockHash, &pVgDataBlocks);
}
CHECK_CODE(insBuildOutput(modifyNode));
insDestroyBlockArrayList(modifyNode->pVgDataBlocks);
return TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {
code = insBuildOutput(pVgHash, pVgDataBlocks, &((SVnodeModifOpStmt*)pQuery->pRoot)->pDataBlocks);
}
insDestroyBlockArrayList(pVgDataBlocks);
return code;
}
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
......
......@@ -832,6 +832,10 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
}
}
if (NULL != strchr(pName->tname, '.')) {
code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
}
return code;
}
......@@ -930,24 +934,24 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
}
}
int32_t insBuildOutput(SVnodeModifOpStmt* pStmt) {
size_t numOfVg = taosArrayGetSize(pStmt->pVgDataBlocks);
pStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
if (NULL == pStmt->pDataBlocks) {
int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray** pDataBlocks) {
size_t numOfVg = taosArrayGetSize(pVgDataBlocks);
*pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
if (NULL == *pDataBlocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (size_t i = 0; i < numOfVg; ++i) {
STableDataBlocks* src = taosArrayGetP(pStmt->pVgDataBlocks, i);
STableDataBlocks* src = taosArrayGetP(pVgDataBlocks, i);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
taosHashGetDup(pStmt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
dst->numOfTables = src->numOfTables;
dst->size = src->size;
TSWAP(dst->pData, src->pData);
buildMsgHeader(src, dst);
taosArrayPush(pStmt->pDataBlocks, &dst);
taosArrayPush(*pDataBlocks, &dst);
}
return TSDB_CODE_SUCCESS;
}
[11/06 18:32:29.554116] INFO: start creating 100 table(s) with 8 thread(s)
[11/06 18:32:29.649584] SUCC: Spent 0.0950 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created
[11/06 18:32:29.687312] SUCC: thread[3] completed total inserted rows: 1300, 48030.74 records/second
[11/06 18:32:29.690040] SUCC: thread[0] completed total inserted rows: 1300, 43537.96 records/second
[11/06 18:32:29.691061] SUCC: thread[5] completed total inserted rows: 1200, 39417.93 records/second
[11/06 18:32:29.708308] SUCC: thread[4] completed total inserted rows: 1200, 26671.41 records/second
[11/06 18:32:29.709035] SUCC: thread[6] completed total inserted rows: 1200, 26515.82 records/second
[11/06 18:32:29.709355] SUCC: thread[2] completed total inserted rows: 1300, 27370.73 records/second
[11/06 18:32:29.711754] SUCC: thread[7] completed total inserted rows: 1200, 25266.35 records/second
[11/06 18:32:29.713149] SUCC: thread[1] completed total inserted rows: 1300, 24847.57 records/second
[11/06 18:32:29.714356] SUCC: Spent 0.052822 seconds to insert rows: 10000 with 8 thread(s) into test 189315.06 records/second
[11/06 18:32:29.714381] SUCC: insert delay, min: 0.78ms, avg: 3.25ms, p90: 7.37ms, p95: 10.48ms, p99: 11.98ms, max: 11.98ms
[11/06 18:32:58.075364] INFO: start creating 100 table(s) with 8 thread(s)
[11/06 18:32:58.140297] SUCC: Spent 0.0650 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created
[11/06 18:32:58.212640] SUCC: thread[2] completed total inserted rows: 13000, 214904.45 records/second
[11/06 18:32:58.235314] SUCC: thread[5] completed total inserted rows: 12000, 144468.66 records/second
[11/06 18:32:58.238604] SUCC: thread[4] completed total inserted rows: 12000, 142203.68 records/second
[11/06 18:32:58.239432] SUCC: thread[3] completed total inserted rows: 13000, 148244.44 records/second
[11/06 18:32:58.240457] SUCC: thread[1] completed total inserted rows: 13000, 146629.22 records/second
[11/06 18:32:58.242997] SUCC: thread[6] completed total inserted rows: 12000, 135484.53 records/second
[11/06 18:32:58.244407] SUCC: thread[0] completed total inserted rows: 13000, 139774.43 records/second
[11/06 18:32:58.244679] SUCC: thread[7] completed total inserted rows: 12000, 133324.45 records/second
[11/06 18:32:58.245536] SUCC: Spent 0.093401 seconds to insert rows: 100000 with 8 thread(s) into test 1070652.35 records/second
[11/06 18:32:58.245574] SUCC: insert delay, min: 1.87ms, avg: 6.76ms, p90: 11.67ms, p95: 16.21ms, p99: 20.30ms, max: 20.30ms
[11/06 18:41:03.922840] INFO: start creating 100 table(s) with 8 thread(s)
[11/06 18:41:03.987301] SUCC: Spent 0.0650 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created
[11/06 18:41:04.035594] SUCC: thread[0] completed total inserted rows: 1300, 34073.34 records/second
[11/06 18:41:04.036945] SUCC: thread[2] completed total inserted rows: 1300, 33197.99 records/second
[11/06 18:41:04.043375] SUCC: thread[5] completed total inserted rows: 1200, 27713.63 records/second
[11/06 18:41:04.054865] SUCC: thread[4] completed total inserted rows: 1200, 21176.72 records/second
[11/06 18:41:04.056026] SUCC: thread[7] completed total inserted rows: 1200, 23637.87 records/second
[11/06 18:41:04.056954] SUCC: thread[6] completed total inserted rows: 1200, 20847.45 records/second
[11/06 18:41:04.058160] SUCC: thread[3] completed total inserted rows: 1300, 21564.24 records/second
[11/06 18:41:04.060392] SUCC: thread[1] completed total inserted rows: 1300, 20688.12 records/second
[11/06 18:41:04.061369] SUCC: Spent 0.063170 seconds to insert rows: 10000 with 8 thread(s) into test 158302.99 records/second
[11/06 18:41:04.061385] SUCC: insert delay, min: 0.62ms, avg: 4.09ms, p90: 13.44ms, p95: 19.25ms, p99: 29.64ms, max: 29.64ms
[11/06 18:41:34.386476] INFO: start creating 100 table(s) with 8 thread(s)
[11/06 18:41:34.465414] SUCC: Spent 0.0790 seconds to create 100 table(s) with 8 thread(s), already exist 0 table(s), actual 100 table(s) pre created, 0 table(s) will be auto created
[11/06 18:41:34.546439] SUCC: thread[2] completed total inserted rows: 13000, 186091.79 records/second
[11/06 18:41:34.549390] SUCC: thread[4] completed total inserted rows: 12000, 164774.05 records/second
[11/06 18:41:34.556956] SUCC: thread[5] completed total inserted rows: 12000, 153440.91 records/second
[11/06 18:41:34.563860] SUCC: thread[6] completed total inserted rows: 12000, 138154.94 records/second
[11/06 18:41:34.564020] SUCC: thread[0] completed total inserted rows: 13000, 148469.62 records/second
[11/06 18:41:34.564693] SUCC: thread[3] completed total inserted rows: 13000, 149214.33 records/second
[11/06 18:41:34.571055] SUCC: thread[1] completed total inserted rows: 13000, 140452.47 records/second
[11/06 18:41:34.571921] SUCC: thread[7] completed total inserted rows: 12000, 129424.71 records/second
[11/06 18:41:34.572778] SUCC: Spent 0.095759 seconds to insert rows: 100000 with 8 thread(s) into test 1044288.27 records/second
[11/06 18:41:34.572797] SUCC: insert delay, min: 1.77ms, avg: 6.68ms, p90: 10.72ms, p95: 14.14ms, p99: 17.69ms, max: 17.69ms
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册