提交 8656633c 编写于 作者: D dapan1121

Merge branch 'enh/insertOptimize' of github.com:taosdata/TDengine into enh/insertOptimize

...@@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt ...@@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) {
insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
} }
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
...@@ -829,6 +830,33 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo ...@@ -829,6 +830,33 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
return code; return code;
} }
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
SVgroupInfo vg;
bool exists = true;
int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
if (TSDB_CODE_SUCCESS == code) {
if (exists) {
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
*pMissCache = !exists;
}
return code;
}
static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
SParseContext* pComCxt = pCxt->pComCxt;
int32_t code = TSDB_CODE_SUCCESS;
if (pComCxt->async) {
code = getTableMetaAndVgroupImpl(pComCxt->pCatalog, pStmt, pMissCache);
} else {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
}
}
return code;
}
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
if (pCxt->forceUpdate) { if (pCxt->forceUpdate) {
pCxt->missCache = true; pCxt->missCache = true;
...@@ -836,11 +864,14 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt ...@@ -836,11 +864,14 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
} }
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
// code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
// }
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
// code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
// }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
}
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
} }
return code; return code;
} }
...@@ -934,10 +965,9 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* ...@@ -934,10 +965,9 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt*
pStmt->pTableMeta->uid = 0; pStmt->pTableMeta->uid = 0;
} }
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), return insGetDataBlockFromList(
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq);
&pStmt->createTblReq);
} }
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStmt->targetTableName, tbFName); tNameExtractFullName(&pStmt->targetTableName, tbFName);
...@@ -1540,8 +1570,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) ...@@ -1540,8 +1570,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
pStmt->usingTableName.tname);
memset(&pCxt->tags, 0, sizeof(pCxt->tags)); memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pStmt->pVgroupsHashObj = NULL; pStmt->pVgroupsHashObj = NULL;
...@@ -1915,13 +1946,11 @@ static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCata ...@@ -1915,13 +1946,11 @@ static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCata
} }
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) { int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
SInsertParseContext context = { SInsertParseContext context = {.pComCxt = pCxt,
.pComCxt = pCxt,
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
.missCache = false, .missCache = false,
.usingDuplicateTable = false, .usingDuplicateTable = false,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册