提交 8513dc10 编写于 作者: X Xiaoyu Wang

enh: insert optimize

上级 bb490f07
...@@ -186,6 +186,8 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, ...@@ -186,6 +186,8 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
*/ */
int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta); STableMeta** pTableMeta);
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta);
/** /**
* Get a super table's meta data. * Get a super table's meta data.
...@@ -198,6 +200,8 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const S ...@@ -198,6 +200,8 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const S
*/ */
int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta); STableMeta** pTableMeta);
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta);
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
...@@ -261,7 +265,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, c ...@@ -261,7 +265,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, c
* @return error code * @return error code
*/ */
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pName, SVgroupInfo* vgInfo); int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pName, SVgroupInfo* vgInfo);
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
SVgroupInfo* pVgroup, bool* exists);
/** /**
* Get all meta data required in pReq. * Get all meta data required in pReq.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
......
...@@ -354,12 +354,33 @@ typedef struct SVgDataBlocks { ...@@ -354,12 +354,33 @@ typedef struct SVgDataBlocks {
void* pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... void* pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks; } SVgDataBlocks;
typedef void (*FFreeDataBlockHash)(SHashObj*);
typedef void (*FFreeDataBlockArray)(SArray*);
typedef struct SVnodeModifOpStmt { typedef struct SVnodeModifOpStmt {
ENodeType nodeType; ENodeType nodeType;
ENodeType sqlNodeType; ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>. SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position const char* pSql; // current sql statement position
int32_t totalRowsNum;
int32_t totalTbNum;
SName targetTableName;
SName usingTableName;
const char* pBoundCols;
struct STableMeta* pTableMeta;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj;
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SArray* pVgDataBlocks;
SVCreateTbReq createTblReq;
TdFilePtr fp;
FFreeDataBlockHash freeHashFunc;
FFreeDataBlockArray freeArrayFunc;
bool usingTableProcessing;
bool fileProcessing;
} SVnodeModifOpStmt; } SVnodeModifOpStmt;
typedef struct SExplainOptions { typedef struct SExplainOptions {
...@@ -389,24 +410,32 @@ typedef enum EQueryExecMode { ...@@ -389,24 +410,32 @@ typedef enum EQueryExecMode {
QUERY_EXEC_MODE_EMPTY_RESULT QUERY_EXEC_MODE_EMPTY_RESULT
} EQueryExecMode; } EQueryExecMode;
typedef enum EQueryExecStage {
QUERY_EXEC_STAGE_PARSE = 1,
QUERY_EXEC_STAGE_ANALYSE,
QUERY_EXEC_STAGE_SCHEDULE,
QUERY_EXEC_STAGE_END
} EQueryExecStage;
typedef struct SQuery { typedef struct SQuery {
ENodeType type; ENodeType type;
EQueryExecMode execMode; EQueryExecStage execStage;
bool haveResultSet; EQueryExecMode execMode;
SNode* pRoot; bool haveResultSet;
int32_t numOfResCols; SNode* pRoot;
SSchema* pResSchema; int32_t numOfResCols;
int8_t precision; SSchema* pResSchema;
SCmdMsgInfo* pCmdMsg; int8_t precision;
int32_t msgType; SCmdMsgInfo* pCmdMsg;
SArray* pTargetTableList; int32_t msgType;
SArray* pTableList; SArray* pTargetTableList;
SArray* pDbList; SArray* pTableList;
bool showRewrite; SArray* pDbList;
int32_t placeholderNum; bool showRewrite;
SArray* pPlaceholderValues; int32_t placeholderNum;
SNode* pPrepareRoot; SArray* pPlaceholderValues;
bool stableQuery; SNode* pPrepareRoot;
bool stableQuery;
} SQuery; } SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
...@@ -64,8 +64,6 @@ typedef struct SParseContext { ...@@ -64,8 +64,6 @@ typedef struct SParseContext {
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId; int64_t allocatorId;
bool needMultiParse;
SParseCsvCxt csvCxt;
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
...@@ -75,6 +73,8 @@ bool qIsInsertValuesSql(const char* pStr, size_t length); ...@@ -75,6 +73,8 @@ bool qIsInsertValuesSql(const char* pStr, size_t length);
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery); const struct SMetaData* pMetaData, SQuery* pQuery);
int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData,
SQuery* pQuery);
void qDestroyParseContext(SParseContext* pCxt); void qDestroyParseContext(SParseContext* pCxt);
......
...@@ -379,7 +379,6 @@ void hbMgrInitMqHbRspHandle(); ...@@ -379,7 +379,6 @@ void hbMgrInitMqHbRspHandle();
typedef struct SSqlCallbackWrapper { typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx; SParseContext* pParseCtx;
SCatalogReq* pCatalogReq; SCatalogReq* pCatalogReq;
SMetaData* pResultMeta;
SRequestObj* pRequest; SRequestObj* pRequest;
} SSqlCallbackWrapper; } SSqlCallbackWrapper;
...@@ -393,7 +392,7 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); ...@@ -393,7 +392,7 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest); bool qnodeRequired(SRequestObj* pRequest);
int32_t continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -867,6 +867,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -867,6 +867,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return code; return code;
} }
static bool incompletaFileParsing(SNode* pStmt) {
return QUERY_NODE_VNODE_MODIF_STMT != nodeType(pStmt) ? false : ((SVnodeModifOpStmt*)pStmt)->fileProcessing;
}
// todo refacto the error code mgmt // todo refacto the error code mgmt
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SSqlCallbackWrapper* pWrapper = param; SSqlCallbackWrapper* pWrapper = param;
...@@ -921,11 +925,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -921,11 +925,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest->code = code1; pRequest->code = code1;
} }
if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pWrapper->pParseCtx && pWrapper->pParseCtx->needMultiParse) { if (pRequest->code == TSDB_CODE_SUCCESS && incompletaFileParsing(pRequest->pQuery->pRoot)) {
code = continueInsertFromCsv(pWrapper, pRequest); continueInsertFromCsv(pWrapper, pRequest);
if (TSDB_CODE_SUCCESS == code) { return;
return;
}
} }
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
...@@ -1049,7 +1051,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat ...@@ -1049,7 +1051,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
} }
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL; SArray* pNodeList = NULL;
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pRequest->pQuery->pRoot)) {
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
}
SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter, SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
......
...@@ -682,11 +682,10 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { ...@@ -682,11 +682,10 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
} }
destoryCatalogReq(pWrapper->pCatalogReq); destoryCatalogReq(pWrapper->pCatalogReq);
qDestroyParseContext(pWrapper->pParseCtx); qDestroyParseContext(pWrapper->pParseCtx);
catalogFreeMetaData(pWrapper->pResultMeta);
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
} }
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
...@@ -704,13 +703,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -704,13 +703,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->metric.semanticEnd = taosGetTimestampUs(); pRequest->metric.semanticEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS && pWrapper->pParseCtx->needMultiParse) {
pWrapper->pResultMeta = catalogCloneMetaData(pResultMeta);
if (NULL == pWrapper->pResultMeta) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
...@@ -747,14 +739,83 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -747,14 +739,83 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} }
} }
int32_t continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) { static int32_t getAllMetaAsync(SSqlCallbackWrapper *pWrapper, catalogCallback fp) {
qDestroyQuery(pRequest->pQuery); SRequestConnInfo conn = {.pTrans = pWrapper->pParseCtx->pTransporter,
pRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); .requestId = pWrapper->pParseCtx->requestId,
if (NULL == pRequest->pQuery) { .requestObjRefId = pWrapper->pParseCtx->requestRid,
return TSDB_CODE_OUT_OF_MEMORY; .mgmtEps = pWrapper->pParseCtx->mgmtEpSet};
pWrapper->pRequest->metric.ctgStart = taosGetTimestampUs();
return catalogAsyncGetAllMeta(pWrapper->pParseCtx->pCatalog, &conn, pWrapper->pCatalogReq, fp, pWrapper,
&pWrapper->pRequest->body.queryJob);
}
static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code);
static int32_t phaseAsyncQuery(SSqlCallbackWrapper *pWrapper) {
int32_t code = TSDB_CODE_SUCCESS;
switch (pWrapper->pRequest->pQuery->execStage) {
case QUERY_EXEC_STAGE_PARSE: {
// continue parse after get metadata
code = getAllMetaAsync(pWrapper, doAsyncQueryFromParse);
break;
}
case QUERY_EXEC_STAGE_ANALYSE: {
// analysis after get metadata
code = getAllMetaAsync(pWrapper, doAsyncQueryFromAnalyse);
break;
}
case QUERY_EXEC_STAGE_SCHEDULE: {
launchAsyncQuery(pWrapper->pRequest, pWrapper->pRequest->pQuery, NULL, pWrapper);
break;
}
default:
break;
}
return code;
}
static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs();
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
if (code == TSDB_CODE_SUCCESS) {
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = phaseAsyncQuery(pWrapper);
}
if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
tstrerror(code), pWrapper->pRequest->requestId);
destorySqlCallbackWrapper(pWrapper);
terrno = code;
pWrapper->pRequest->code = code;
pWrapper->pRequest->body.queryFp(pWrapper->pRequest->body.param, pWrapper->pRequest, code);
}
}
void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) {
int32_t code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
if (TSDB_CODE_SUCCESS == code) {
code = phaseAsyncQuery(pWrapper);
}
if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
tstrerror(code), pWrapper->pRequest->requestId);
destorySqlCallbackWrapper(pWrapper);
terrno = code;
pWrapper->pRequest->code = code;
pWrapper->pRequest->body.queryFp(pWrapper->pRequest->body.param, pWrapper->pRequest, code);
} }
retrieveMetaCallback(pWrapper->pResultMeta, pWrapper, TSDB_CODE_SUCCESS);
return TSDB_CODE_SUCCESS;
} }
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
...@@ -836,26 +897,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -836,26 +897,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if (TSDB_CODE_SUCCESS == code && !updateMetaForce) { if (TSDB_CODE_SUCCESS == code && !updateMetaForce) {
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (NULL == pRequest->pQuery->pRoot) { if (QUERY_NODE_INSERT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pRequest->pQuery->pRoot->type) { } else if (QUERY_NODE_SELECT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SRequestConnInfo conn = {.pTrans = pWrapper->pParseCtx->pTransporter, phaseAsyncQuery(pWrapper);
.requestId = pWrapper->pParseCtx->requestId, } else {
.requestObjRefId = pWrapper->pParseCtx->requestRid,
.mgmtEps = pWrapper->pParseCtx->mgmtEpSet};
pRequest->metric.ctgStart = taosGetTimestampUs();
code = catalogAsyncGetAllMeta(pWrapper->pParseCtx->pCatalog, &conn, pWrapper->pCatalogReq, retrieveMetaCallback,
pWrapper, &pRequest->body.queryJob);
}
if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
......
...@@ -791,9 +791,24 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -791,9 +791,24 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode((SNode*)pStmt->pSlimit); nodesDestroyNode((SNode*)pStmt->pSlimit);
break; break;
} }
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT: {
destroyVgDataBlockArray(((SVnodeModifOpStmt*)pNode)->pDataBlocks); SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pNode;
destroyVgDataBlockArray(pStmt->pDataBlocks);
taosMemoryFreeClear(pStmt->pTableMeta);
taosHashCleanup(pStmt->pVgroupsHashObj);
taosHashCleanup(pStmt->pSubTableHashObj);
taosHashCleanup(pStmt->pTableNameHashObj);
taosHashCleanup(pStmt->pDbFNameHashObj);
if (pStmt->freeHashFunc) {
pStmt->freeHashFunc(pStmt->pTableBlockHashObj);
}
if (pStmt->freeArrayFunc) {
pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
}
tdDestroySVCreateTbReq(&pStmt->createTblReq);
taosCloseFile(&pStmt->fp);
break; break;
}
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT:
nodesDestroyNode((SNode*)((SCreateDatabaseStmt*)pNode)->pOptions); nodesDestroyNode((SNode*)((SCreateDatabaseStmt*)pNode)->pOptions);
break; break;
......
...@@ -79,29 +79,6 @@ typedef struct SInsertParseBaseContext { ...@@ -79,29 +79,6 @@ typedef struct SInsertParseBaseContext {
SMsgBuf msg; SMsgBuf msg;
} SInsertParseBaseContext; } SInsertParseBaseContext;
typedef struct SInsertParseContext {
SParseContext *pComCxt; // input
char *pSql; // input
SMsgBuf msg; // input
STableMeta *pTableMeta; // each table
SParsedDataColInfo tags; // each table
SVCreateTbReq createTblReq; // each table
SHashObj *pVgroupsHashObj; // global
SHashObj *pTableBlockHashObj; // global
SHashObj *pSubTableHashObj; // global
SArray *pVgDataBlocks; // global
SHashObj *pTableNameHashObj; // global
SHashObj *pDbFNameHashObj; // global
int32_t totalNum;
SVnodeModifOpStmt *pOutput;
SStmtCallback *pStmtCb;
SParseMetaCache *pMetaCache;
char sTableName[TSDB_TABLE_NAME_LEN];
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
int64_t memElapsed;
int64_t parRowElapsed;
} SInsertParseContext;
typedef struct SInsertParseSyntaxCxt { typedef struct SInsertParseSyntaxCxt {
SParseContext *pComCxt; SParseContext *pComCxt;
char *pSql; char *pSql;
...@@ -142,7 +119,7 @@ typedef struct STableDataBlocks { ...@@ -142,7 +119,7 @@ typedef struct STableDataBlocks {
int32_t insGetExtendedRowSize(STableDataBlocks *pBlock); int32_t insGetExtendedRowSize(STableDataBlocks *pBlock);
void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx); void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx);
int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows); int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows, SMsgBuf *pMsg);
int32_t insSchemaIdxCompar(const void *lhs, const void *rhs); int32_t insSchemaIdxCompar(const void *lhs, const void *rhs);
int32_t insBoundIdxCompar(const void *lhs, const void *rhs); int32_t insBoundIdxCompar(const void *lhs, const void *rhs);
void insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols); void insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
...@@ -161,7 +138,7 @@ void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag ...@@ -161,7 +138,7 @@ void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag
SArray *tagName, uint8_t tagNum); SArray *tagName, uint8_t tagNum);
int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
int32_t insBuildOutput(SInsertParseContext *pCxt); int32_t insBuildOutput(SVnodeModifOpStmt *pStmt);
void insDestroyDataBlock(STableDataBlocks *pDataBlock); void insDestroyDataBlock(STableDataBlocks *pDataBlock);
#endif // TDENGINE_PAR_INSERT_UTIL_H #endif // TDENGINE_PAR_INSERT_UTIL_H
...@@ -27,8 +27,7 @@ extern "C" { ...@@ -27,8 +27,7 @@ extern "C" {
#define QUERY_SMA_OPTIMIZE_DISABLE 0 #define QUERY_SMA_OPTIMIZE_DISABLE 0
#define QUERY_SMA_OPTIMIZE_ENABLE 1 #define QUERY_SMA_OPTIMIZE_ENABLE 1
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache); int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData);
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache); int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache); int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
......
...@@ -60,22 +60,17 @@ typedef struct SInsertTablesMetaReq { ...@@ -60,22 +60,17 @@ typedef struct SInsertTablesMetaReq {
} SInsertTablesMetaReq; } SInsertTablesMetaReq;
typedef struct SParseMetaCache { typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta* SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>* SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo* SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo* SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo* SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>* SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg* SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SArray* pDnodes; // element is SEpSet SArray* pDnodes; // element is SEpSet
bool dnodeRequired; bool dnodeRequired;
SHashObj* pInsertTables; // key is dbName, element is SInsertTablesMetaReq*, for insert
const char* pUser;
const SArray* pTableMetaData; // pRes = STableMeta*
const SArray* pTableVgroupData; // pRes = SVgroupInfo*
int32_t sqlTableNum;
} SParseMetaCache; } SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
...@@ -93,9 +88,8 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta); ...@@ -93,9 +88,8 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName); int32_t getInsTagsTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq); int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
bool insertValuesStmt);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
...@@ -122,12 +116,6 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun ...@@ -122,12 +116,6 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput); int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta);
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -84,6 +84,7 @@ abort_parse: ...@@ -84,6 +84,7 @@ abort_parse:
(*pQuery)->pRoot = cxt.pRootNode; (*pQuery)->pRoot = cxt.pRootNode;
(*pQuery)->placeholderNum = cxt.placeholderNo; (*pQuery)->placeholderNum = cxt.placeholderNo;
TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues); TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues);
(*pQuery)->execStage = QUERY_EXEC_STAGE_ANALYSE;
} }
taosArrayDestroy(cxt.pPlaceholderValues); taosArrayDestroy(cxt.pPlaceholderValues);
return cxt.errCode; return cxt.errCode;
......
...@@ -333,11 +333,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols ...@@ -333,11 +333,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
} }
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, rowNum)) { return insSetBlockInfo(pBlocks, pDataBlock, rowNum, &pBuf);
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
} }
void* smlInitHandle(SQuery* pQuery) { void* smlInitHandle(SQuery* pQuery) {
......
...@@ -30,22 +30,17 @@ typedef struct SKvParam { ...@@ -30,22 +30,17 @@ typedef struct SKvParam {
} SKvParam; } SKvParam;
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot; SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
int32_t code = 0; int32_t code = 0;
SInsertParseContext insertCtx = {
.pVgroupsHashObj = pVgHash,
.pTableBlockHashObj = pBlockHash,
.pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
};
// merge according to vgId // merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { if (taosHashGetSize(pBlockHash) > 0) {
CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks)); CHECK_CODE(insMergeTableDataBlocks(pBlockHash, &modifyNode->pVgDataBlocks));
} }
CHECK_CODE(insBuildOutput(&insertCtx)); CHECK_CODE(insBuildOutput(modifyNode));
insDestroyBlockArrayList(insertCtx.pVgDataBlocks); insDestroyBlockArrayList(modifyNode->pVgDataBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -222,11 +217,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in ...@@ -222,11 +217,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
} }
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) { return insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf);
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
...@@ -308,10 +299,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu ...@@ -308,10 +299,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
pDataBlock->size += extendedRowSize * bind->num; pDataBlock->size += extendedRowSize * bind->num;
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) { CHECK_CODE(insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf));
return buildInvalidOperationMsg(&pBuf,
"too many rows in sql, total number of rows should be less than INT32_MAX");
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -110,18 +110,17 @@ void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t i ...@@ -110,18 +110,17 @@ void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t i
} }
} }
int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) { int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows, SMsgBuf* pMsg) {
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid); pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
pBlocks->uid = dataBuf->pTableMeta->uid; pBlocks->uid = dataBuf->pTableMeta->uid;
pBlocks->sversion = dataBuf->pTableMeta->sversion; pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen; pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) { if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION; return buildInvalidOperationMsg(pMsg, "too many rows in sql, total number of rows should be less than INT32_MAX");
} else {
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
} }
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
} }
void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) { void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
...@@ -271,12 +270,8 @@ void insDestroyDataBlock(STableDataBlocks* pDataBlock) { ...@@ -271,12 +270,8 @@ void insDestroyDataBlock(STableDataBlocks* pDataBlock) {
} }
taosMemoryFreeClear(pDataBlock->pData); taosMemoryFreeClear(pDataBlock->pData);
// if (!pDataBlock->cloned) {
// free the refcount for metermeta
taosMemoryFreeClear(pDataBlock->pTableMeta); taosMemoryFreeClear(pDataBlock->pTableMeta);
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo); destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
// }
taosMemoryFreeClear(pDataBlock); taosMemoryFreeClear(pDataBlock);
} }
...@@ -312,20 +307,6 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in ...@@ -312,20 +307,6 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
int32_t columns = getNumOfColumns(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
for (int32_t i = 0; i < columns; ++i) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
}
result += (int32_t)TD_BITMAP_BYTES(columns - 1);
return result;
}
#endif
void insDestroyBlockArrayList(SArray* pDataBlockList) { void insDestroyBlockArrayList(SArray* pDataBlockList) {
if (pDataBlockList == NULL) { if (pDataBlockList == NULL) {
...@@ -357,51 +338,6 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) { ...@@ -357,51 +338,6 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
taosHashCleanup(pDataBlockHash); taosHashCleanup(pDataBlockHash);
} }
#if 0
// data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
if (!dataBuf->ordered) {
char* pBlockData = pBlocks->data;
// todo. qsort is unstable, if timestamp is same, should get the last one
taosSort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0;
int32_t j = 1;
// delete rows with timestamp conflicts
while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY*)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY*)(pBlockData + dataBuf->rowSize * j);
if (ti == tj) {
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
}
dataBuf->prevTS = INT64_MIN;
}
#endif
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
...@@ -994,24 +930,24 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { ...@@ -994,24 +930,24 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
} }
} }
int32_t insBuildOutput(SInsertParseContext* pCxt) { int32_t insBuildOutput(SVnodeModifOpStmt* pStmt) {
size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks); size_t numOfVg = taosArrayGetSize(pStmt->pVgDataBlocks);
pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); pStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
if (NULL == pCxt->pOutput->pDataBlocks) { if (NULL == pStmt->pDataBlocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (size_t i = 0; i < numOfVg; ++i) { for (size_t i = 0; i < numOfVg; ++i) {
STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i); STableDataBlocks* src = taosArrayGetP(pStmt->pVgDataBlocks, i);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) { if (NULL == dst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); taosHashGetDup(pStmt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
dst->numOfTables = src->numOfTables; dst->numOfTables = src->numOfTables;
dst->size = src->size; dst->size = src->size;
TSWAP(dst->pData, src->pData); TSWAP(dst->pData, src->pData);
buildMsgHeader(src, dst); buildMsgHeader(src, dst);
taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); taosArrayPush(pStmt->pDataBlocks, &dst);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -612,62 +612,7 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) { ...@@ -612,62 +612,7 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildCatalogReqForInsert(SParseContext* pCxt, const SParseMetaCache* pMetaCache, int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
SCatalogReq* pCatalogReq) {
int32_t ndbs = taosHashGetSize(pMetaCache->pInsertTables);
pCatalogReq->pTableMeta = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pTableHash = taosArrayInit(ndbs, sizeof(STablesReq));
if (NULL == pCatalogReq->pTableHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCatalogReq->pUser = taosArrayInit(ndbs, sizeof(SUserAuthInfo));
if (NULL == pCatalogReq->pUser) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pTableMetaPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
pCxt->pTableVgroupPos = taosArrayInit(pMetaCache->sqlTableNum, sizeof(int32_t));
int32_t metaReqNo = 0;
int32_t vgroupReqNo = 0;
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
while (NULL != p) {
STablesReq req = {0};
strcpy(req.dbFName, p->dbFName);
TSWAP(req.pTables, p->pTableMetaReq);
taosArrayPush(pCatalogReq->pTableMeta, &req);
req.pTables = NULL;
TSWAP(req.pTables, p->pTableVgroupReq);
taosArrayPush(pCatalogReq->pTableHash, &req);
int32_t ntables = taosArrayGetSize(p->pTableMetaPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableMetaPos, *(int32_t*)taosArrayGet(p->pTableMetaPos, i), &metaReqNo);
++metaReqNo;
}
ntables = taosArrayGetSize(p->pTableVgroupPos);
for (int32_t i = 0; i < ntables; ++i) {
taosArrayInsert(pCxt->pTableVgroupPos, *(int32_t*)taosArrayGet(p->pTableVgroupPos, i), &vgroupReqNo);
++vgroupReqNo;
}
SUserAuthInfo auth = {0};
snprintf(auth.user, sizeof(auth.user), "%s", pCxt->pUser);
snprintf(auth.dbFName, sizeof(auth.dbFName), "%s", p->dbFName);
auth.type = AUTH_TYPE_WRITE;
taosArrayPush(pCatalogReq->pUser, &auth);
p = taosHashIterate(pMetaCache->pInsertTables, p);
}
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta); int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup); code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
...@@ -697,13 +642,6 @@ int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq* ...@@ -697,13 +642,6 @@ int32_t buildCatalogReqForQuery(const SParseMetaCache* pMetaCache, SCatalogReq*
return code; return code;
} }
int32_t buildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
if (NULL != pMetaCache->pInsertTables) {
return buildCatalogReqForInsert(pCxt, pMetaCache, pCatalogReq);
}
return buildCatalogReqForQuery(pMetaCache, pCatalogReq);
}
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) { static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
if (NULL == *pHash) { if (NULL == *pHash) {
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); *pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
...@@ -791,8 +729,7 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas ...@@ -791,8 +729,7 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
SParseMetaCache* pMetaCache) {
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta); int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup); code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
...@@ -822,30 +759,6 @@ int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaDa ...@@ -822,30 +759,6 @@ int32_t putMetaDataToCacheForQuery(const SCatalogReq* pCatalogReq, const SMetaDa
return code; return code;
} }
int32_t putMetaDataToCacheForInsert(const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t ndbs = taosArrayGetSize(pMetaData->pUser);
for (int32_t i = 0; i < ndbs; ++i) {
SMetaRes* pRes = taosArrayGet(pMetaData->pUser, i);
if (TSDB_CODE_SUCCESS != pRes->code) {
return pRes->code;
}
if (!(*(bool*)pRes->pRes)) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
}
pMetaCache->pTableMetaData = pMetaData->pTableMeta;
pMetaCache->pTableVgroupData = pMetaData->pTableHash;
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache,
bool insertValuesStmt) {
if (insertValuesStmt) {
return putMetaDataToCacheForInsert(pMetaData, pMetaCache);
}
return putMetaDataToCacheForQuery(pCatalogReq, pMetaData, pMetaCache);
}
static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) { static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHashObj** pTables) {
if (NULL == *pTables) { if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); *pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
...@@ -1146,82 +1059,6 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) { ...@@ -1146,82 +1059,6 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t reserveTableReqInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SInsertTablesMetaReq* pReq) {
switch (reqType) {
case CATALOG_REQ_TYPE_META:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
break;
case CATALOG_REQ_TYPE_VGROUP:
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
case CATALOG_REQ_TYPE_BOTH:
taosArrayPush(pReq->pTableMetaReq, pName);
taosArrayPush(pReq->pTableMetaPos, &tableNo);
taosArrayPush(pReq->pTableVgroupReq, pName);
taosArrayPush(pReq->pTableVgroupPos, &tableNo);
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t reserveTableReqInDbCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SHashObj* pDbs) {
SInsertTablesMetaReq req = {.pTableMetaReq = taosArrayInit(4, sizeof(SName)),
.pTableMetaPos = taosArrayInit(4, sizeof(int32_t)),
.pTableVgroupReq = taosArrayInit(4, sizeof(SName)),
.pTableVgroupPos = taosArrayInit(4, sizeof(int32_t))};
tNameGetFullDbName(pName, req.dbFName);
int32_t code = reserveTableReqInCacheForInsert(pName, reqType, tableNo, &req);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pDbs, pName->dbname, strlen(pName->dbname), &req, sizeof(SInsertTablesMetaReq));
}
return code;
}
int32_t reserveTableMetaInCacheForInsert(const SName* pName, ECatalogReqType reqType, int32_t tableNo,
SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pInsertTables) {
pMetaCache->pInsertTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pInsertTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pMetaCache->sqlTableNum = tableNo;
SInsertTablesMetaReq* pReq = taosHashGet(pMetaCache->pInsertTables, pName->dbname, strlen(pName->dbname));
if (NULL == pReq) {
return reserveTableReqInDbCacheForInsert(pName, reqType, tableNo, pMetaCache->pInsertTables);
}
return reserveTableReqInCacheForInsert(pName, reqType, tableNo, pReq);
}
int32_t getTableMetaFromCacheForInsert(SArray* pTableMetaPos, SParseMetaCache* pMetaCache, int32_t tableNo,
STableMeta** pMeta) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableMetaPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableMetaData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
*pMeta = tableMetaDup((const STableMeta*)pRes->pRes);
if (NULL == *pMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return pRes->code;
}
int32_t getTableVgroupFromCacheForInsert(SArray* pTableVgroupPos, SParseMetaCache* pMetaCache, int32_t tableNo,
SVgroupInfo* pVgroup) {
int32_t reqIndex = *(int32_t*)taosArrayGet(pTableVgroupPos, tableNo);
SMetaRes* pRes = taosArrayGet(pMetaCache->pTableVgroupData, reqIndex);
if (TSDB_CODE_SUCCESS == pRes->code) {
memcpy(pVgroup, pRes->pRes, sizeof(SVgroupInfo));
}
return pRes->code;
}
void destoryParseTablesMetaReqHash(SHashObj* pHash) { void destoryParseTablesMetaReqHash(SHashObj* pHash) {
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL); SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
while (NULL != p) { while (NULL != p) {
...@@ -1239,16 +1076,6 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { ...@@ -1239,16 +1076,6 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
taosHashCleanup(pMetaCache->pTableMeta); taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pTableVgroup); taosHashCleanup(pMetaCache->pTableVgroup);
} }
SInsertTablesMetaReq* p = taosHashIterate(pMetaCache->pInsertTables, NULL);
while (NULL != p) {
taosArrayDestroy(p->pTableMetaPos);
taosArrayDestroy(p->pTableMetaReq);
taosArrayDestroy(p->pTableVgroupPos);
taosArrayDestroy(p->pTableVgroupReq);
p = taosHashIterate(pMetaCache->pInsertTables, p);
}
taosHashCleanup(pMetaCache->pInsertTables);
taosHashCleanup(pMetaCache->pDbVgroup); taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pDbCfg); taosHashCleanup(pMetaCache->pDbCfg);
taosHashCleanup(pMetaCache->pDbInfo); taosHashCleanup(pMetaCache->pDbInfo);
......
...@@ -167,7 +167,7 @@ static void rewriteExprAlias(SNode* pRoot) { ...@@ -167,7 +167,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery, NULL); code = parseInsertSql(pCxt, pQuery, NULL, NULL);
} else { } else {
code = parseSqlIntoAst(pCxt, pQuery); code = parseSqlIntoAst(pCxt, pQuery);
} }
...@@ -175,21 +175,26 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -175,21 +175,26 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
return code; return code;
} }
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { static int32_t parseQuerySyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = nodesAcquireAllocator(pCxt->allocatorId); int32_t code = parseSqlSyntax(pCxt, pQuery, &metaCache);
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(&metaCache, pCatalogReq);
}
destoryParseMetaCache(&metaCache, true);
return code;
}
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache); code = parseInsertSql(pCxt, pQuery, pCatalogReq, NULL);
} else { } else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache); code = parseQuerySyntax(pCxt, pQuery, pCatalogReq);
} }
} }
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
}
nodesReleaseAllocator(pCxt->allocatorId); nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, true);
terrno = code; terrno = code;
return code; return code;
} }
...@@ -199,14 +204,10 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata ...@@ -199,14 +204,10 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = nodesAcquireAllocator(pCxt->allocatorId); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) { code = analyseSemantic(pCxt, pQuery, &metaCache);
code = parseInsertSql(pCxt, &pQuery, &metaCache);
} else {
code = analyseSemantic(pCxt, pQuery, &metaCache);
}
} }
nodesReleaseAllocator(pCxt->allocatorId); nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, false); destoryParseMetaCache(&metaCache, false);
...@@ -214,6 +215,11 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata ...@@ -214,6 +215,11 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
return code; return code;
} }
int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData,
SQuery* pQuery) {
return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData);
}
void qDestroyParseContext(SParseContext* pCxt) { void qDestroyParseContext(SParseContext* pCxt) {
if (NULL == pCxt) { if (NULL == pCxt) {
return; return;
......
...@@ -228,11 +228,23 @@ int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo* pConn ...@@ -228,11 +228,23 @@ int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo* pConn
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
} }
int32_t __catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta) {
return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true);
}
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName,
SVgroupInfo* vgInfo) { SVgroupInfo* vgInfo) {
return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo); return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo);
} }
int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
SVgroupInfo* pVgroup, bool* exists) {
int32_t code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true);
*exists = 0 != pVgroup->vgId;
return code;
}
int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
SArray** pVgList) { SArray** pVgList) {
return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList); return g_mockCatalogService->catalogGetTableDistVgInfo(pTableName, pVgList);
...@@ -289,8 +301,11 @@ void initMetaDataEnv() { ...@@ -289,8 +301,11 @@ void initMetaDataEnv() {
static Stub stub; static Stub stub;
stub.set(catalogGetHandle, __catalogGetHandle); stub.set(catalogGetHandle, __catalogGetHandle);
stub.set(catalogGetTableMeta, __catalogGetTableMeta); stub.set(catalogGetTableMeta, __catalogGetTableMeta);
stub.set(catalogGetCachedTableMeta, __catalogGetCachedTableMeta);
stub.set(catalogGetSTableMeta, __catalogGetTableMeta); stub.set(catalogGetSTableMeta, __catalogGetTableMeta);
stub.set(catalogGetCachedSTableMeta, __catalogGetCachedTableMeta);
stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup); stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup);
stub.set(catalogGetCachedTableHashVgroup, __catalogGetCachedTableHashVgroup);
stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo); stub.set(catalogGetTableDistVgInfo, __catalogGetTableDistVgInfo);
stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion); stub.set(catalogGetDBVgVersion, __catalogGetDBVgVersion);
stub.set(catalogGetDBVgList, __catalogGetDBVgList); stub.set(catalogGetDBVgList, __catalogGetDBVgList);
......
...@@ -91,7 +91,7 @@ class MockCatalogServiceImpl { ...@@ -91,7 +91,7 @@ class MockCatalogServiceImpl {
public: public:
static const int32_t numOfDataTypes = sizeof(tDataTypes) / sizeof(tDataTypes[0]); static const int32_t numOfDataTypes = sizeof(tDataTypes) / sizeof(tDataTypes[0]);
MockCatalogServiceImpl() : id_(1) {} MockCatalogServiceImpl() : id_(1), havaCache_(true) {}
~MockCatalogServiceImpl() { ~MockCatalogServiceImpl() {
for (auto& cfg : dbCfg_) { for (auto& cfg : dbCfg_) {
...@@ -106,7 +106,11 @@ class MockCatalogServiceImpl { ...@@ -106,7 +106,11 @@ class MockCatalogServiceImpl {
int32_t catalogGetHandle() const { return 0; } int32_t catalogGetHandle() const { return 0; }
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta, bool onlyCache = false) const {
if (onlyCache && !havaCache_) {
return TSDB_CODE_SUCCESS;
}
std::unique_ptr<STableMeta> table; std::unique_ptr<STableMeta> table;
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
...@@ -121,7 +125,12 @@ class MockCatalogServiceImpl { ...@@ -121,7 +125,12 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo, bool onlyCache = false) const {
if (onlyCache && !havaCache_) {
vgInfo->vgId = 0;
return TSDB_CODE_SUCCESS;
}
vgInfo->vgId = 1; vgInfo->vgId = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -618,6 +627,7 @@ class MockCatalogServiceImpl { ...@@ -618,6 +627,7 @@ class MockCatalogServiceImpl {
IndexMetaCache index_; IndexMetaCache index_;
DnodeCache dnode_; DnodeCache dnode_;
DbCfgCache dbCfg_; DbCfgCache dbCfg_;
bool havaCache_;
}; };
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {} MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
...@@ -651,12 +661,14 @@ void MockCatalogService::createDatabase(const std::string& db, bool rollup, int8 ...@@ -651,12 +661,14 @@ void MockCatalogService::createDatabase(const std::string& db, bool rollup, int8
impl_->createDatabase(db, rollup, cacheLast); impl_->createDatabase(db, rollup, cacheLast);
} }
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta,
return impl_->catalogGetTableMeta(pTableName, pTableMeta); bool onlyCache) const {
return impl_->catalogGetTableMeta(pTableName, pTableMeta, onlyCache);
} }
int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo,
return impl_->catalogGetTableHashVgroup(pTableName, vgInfo); bool onlyCache) const {
return impl_->catalogGetTableHashVgroup(pTableName, vgInfo, onlyCache);
} }
int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const { int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const {
......
...@@ -67,8 +67,8 @@ class MockCatalogService { ...@@ -67,8 +67,8 @@ class MockCatalogService {
void createDnode(int32_t dnodeId, const std::string& host, int16_t port); void createDnode(int32_t dnodeId, const std::string& host, int16_t port);
void createDatabase(const std::string& db, bool rollup = false, int8_t cacheLast = 0); void createDatabase(const std::string& db, bool rollup = false, int8_t cacheLast = 0);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const; int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta, bool onlyCache = false) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const; int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo, bool onlyCache = false) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const; int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const; int32_t catalogGetDBVgList(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const; int32_t catalogGetDBCfg(const char* pDbFName, SDbCfgInfo* pDbCfg) const;
......
...@@ -233,16 +233,15 @@ class ParserTestBaseImpl { ...@@ -233,16 +233,15 @@ class ParserTestBaseImpl {
} }
void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) { void doBuildCatalogReq(SParseContext* pCxt, const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pCxt, pMetaCache, pCatalogReq); DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
} }
void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) { void doGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) {
DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData); DO_WITH_THROW(g_mockCatalogService->catalogGetAllMeta, pCatalogReq, pMetaData);
} }
void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache, void doPutMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
bool isInsertValues) { DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache, isInsertValues);
} }
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) { void doAuthenticate(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
...@@ -280,15 +279,14 @@ class ParserTestBaseImpl { ...@@ -280,15 +279,14 @@ class ParserTestBaseImpl {
res_.calcConstAst_ = toString(pQuery->pRoot); res_.calcConstAst_ = toString(pQuery->pRoot);
} }
void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) { void doParseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pMetaCache); DO_WITH_THROW(parseInsertSql, pCxt, pQuery, pCatalogReq, pMetaData);
ASSERT_NE(*pQuery, nullptr); ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot); res_.parsedAst_ = toString((*pQuery)->pRoot);
} }
void doParseInsertSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCache* pMetaCache) { void doContinueParseSql(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SQuery* pQuery) {
DO_WITH_THROW(parseInsertSyntax, pCxt, pQuery, pMetaCache); DO_WITH_THROW(qContinueParseSql, pCxt, pCatalogReq, pMetaData, pQuery);
ASSERT_NE(*pQuery, nullptr);
} }
string toString(const SNode* pRoot) { string toString(const SNode* pRoot) {
...@@ -310,7 +308,7 @@ class ParserTestBaseImpl { ...@@ -310,7 +308,7 @@ class ParserTestBaseImpl {
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr); doParseInsertSql(&cxt, query.get(), nullptr, nullptr);
} else { } else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParse(&cxt, query.get()); doParse(&cxt, query.get());
...@@ -356,61 +354,102 @@ class ParserTestBaseImpl { ...@@ -356,61 +354,102 @@ class ParserTestBaseImpl {
} }
} }
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) { void runQueryAsyncInternalFuncs(SParseContext* pParCxt) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
try { bool request = true;
unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext); unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
setParseContext(sql, cxt.get(), true); new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
doParse(pParCxt, query.get());
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); doCollectMetaKey(pParCxt, *(query.get()), metaCache.get());
bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache( SQuery* pQuery = *(query.get());
new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen); unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
if (isInsertValues) { MockCatalogService::destoryCatalogReq);
doParseInsertSyntax(cxt.get(), query.get(), metaCache.get()); doBuildCatalogReq(pParCxt, metaCache.get(), catalogReq.get());
} else {
doParse(cxt.get(), query.get()); string err;
doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get()); thread t1([&]() {
try {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache());
request = false;
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
doAuthenticate(pParCxt, pQuery, metaCache.get());
doTranslate(pParCxt, pQuery, metaCache.get());
doCalculateConstant(pParCxt, pQuery);
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
} }
});
SQuery* pQuery = *(query.get()); t1.join();
if (!err.empty()) {
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), throw runtime_error(err);
MockCatalogService::destoryCatalogReq); }
doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get()); }
string err; void runInsertAsyncInternalFuncsImpl(SParseContext* pParCxt, SQuery** pQuery, SCatalogReq* pCatalogReq,
thread t1([&]() { SMetaData* pMetaData) {
try { doParseInsertSql(pParCxt, pQuery, pCatalogReq, pMetaData);
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get());
metaCache.reset(new SParseMetaCache()); if (QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
request = false; return;
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); }
if (isInsertValues) { string err;
doParseInsertSql(cxt.get(), query.get(), metaCache.get()); thread t1([&]() {
} else { try {
doAuthenticate(cxt.get(), pQuery, metaCache.get()); doGetAllMeta(pCatalogReq, pMetaData);
doTranslate(cxt.get(), pQuery, metaCache.get()); doParseInsertSql(pParCxt, pQuery, pCatalogReq, pMetaData);
doCalculateConstant(cxt.get(), pQuery); if (QUERY_EXEC_STAGE_SCHEDULE != (*pQuery)->execStage) {
} runInsertAsyncInternalFuncsImpl(pParCxt, pQuery, pCatalogReq, pMetaData);
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
} }
}); } catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
}
});
t1.join();
if (!err.empty()) {
throw runtime_error(err);
}
}
void runInsertAsyncInternalFuncs(SParseContext* pParCxt) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
runInsertAsyncInternalFuncsImpl(pParCxt, query.get(), catalogReq.get(), metaData.get());
}
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
try {
unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, cxt.get(), true);
t1.join(); bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen);
if (!err.empty()) { if (isInsertValues) {
throw runtime_error(err); runInsertAsyncInternalFuncs(cxt.get());
} else {
runQueryAsyncInternalFuncs(cxt.get());
} }
if (g_dump) { if (g_dump) {
...@@ -437,25 +476,39 @@ class ParserTestBaseImpl { ...@@ -437,25 +476,39 @@ class ParserTestBaseImpl {
doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get()); doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
string err; switch (pQuery->execStage) {
thread t1([&]() { case QUERY_EXEC_STAGE_PARSE:
try { case QUERY_EXEC_STAGE_ANALYSE: {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); string err;
doGetAllMeta(catalogReq.get(), metaData.get()); thread t1([&]() {
try {
doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(),
} catch (const TerminateFlag& e) { MockCatalogService::destoryMetaData);
// success and terminate doGetAllMeta(catalogReq.get(), metaData.get());
} catch (const runtime_error& e) { if (QUERY_EXEC_STAGE_PARSE == pQuery->execStage) {
err = e.what(); doContinueParseSql(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
} catch (...) { } else {
err = "unknown error"; doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
}
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
}
});
t1.join();
if (!err.empty()) {
throw runtime_error(err);
}
break;
} }
}); case QUERY_EXEC_STAGE_SCHEDULE:
break;
t1.join(); default:
if (!err.empty()) { break;
throw runtime_error(err);
} }
if (g_dump) { if (g_dump) {
......
...@@ -1134,7 +1134,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1134,7 +1134,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
#if 1 #if 0
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL; msg = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册