提交 fb901d95 编写于 作者: X Xiaoyu Wang

fix: some problems of parser for stmt mode

上级 9218c257
......@@ -325,6 +325,7 @@ typedef struct SQuery {
bool showRewrite;
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pContainPlaceholderRoot;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
......@@ -62,7 +62,7 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t
void qDestroyStmtDataBlock(void* pBlock);
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock);
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId);
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
......@@ -77,8 +77,8 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus
......
......@@ -34,7 +34,6 @@ typedef struct SPlanContext {
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int32_t placeholderNum;
char* pMsg;
int32_t msgLen;
} SPlanContext;
......@@ -48,9 +47,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId,
bool* pEmptyResult);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
......
......@@ -233,8 +233,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.placeholderNum = pQuery->placeholderNum};
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
......@@ -518,7 +517,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg =
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
fprintf(stderr,"failed to connect to server, reason: %s\n\n", errorMsg);
fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
terrno = pRequest->code;
destroyRequest(pRequest);
......@@ -949,7 +948,8 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
convertUcs4);
}
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
......
......@@ -146,7 +146,8 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
return TSDB_CODE_SUCCESS;
}
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash) {
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl,
SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName));
......@@ -157,7 +158,6 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -259,10 +259,10 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
}
size_t keyLen = 0;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
char *key = taosHashGetKey(pIter, &keyLen);
char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
......@@ -320,12 +320,14 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks *pDataBlock, STableDataBlocks **newBlock, uint64_t uid) {
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0};
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &vgInfo));
STMT_ERR_RET(taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname,
&vgInfo));
STMT_ERR_RET(
taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));
......@@ -336,7 +338,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.needParse = true;
pStmt->bInfo.inExecCache = false;
STableDataBlocks *pBlockInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks* pBlockInExec =
taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (pBlockInExec) {
pStmt->bInfo.needParse = false;
pStmt->bInfo.inExecCache = true;
......@@ -371,7 +374,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -381,10 +385,10 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_RET(stmtCleanBindInfo(pStmt));
}
STableMeta *pTableMeta = NULL;
STableMeta* pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
int32_t code = catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
int32_t code =
catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
......@@ -408,7 +412,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (pStmt->bInfo.inExecCache) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
if (NULL == pCache) {
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid);
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
pStmt->bInfo.tbFName, uid, cacheUid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
......@@ -437,7 +442,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid));
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -522,7 +528,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
STMT_ERR_RET(stmtGetFromCache(pStmt));
......@@ -548,7 +555,8 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
return TSDB_CODE_SUCCESS;
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks** pDataBlock =
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -566,7 +574,8 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks** pDataBlock =
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -583,7 +592,8 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks** pDataBlock =
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -618,7 +628,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx, pStmt->exec.pRequest->requestId));
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx));
SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
.acctId = pStmt->taos->acctId,
......@@ -637,23 +647,25 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
if (pStmt->sql.pQuery->haveResultSet) {
setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema, pStmt->sql.pQuery->numOfResCols);
setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
//if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
// if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
// STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
//}
// }
//STMT_ERR_RET(stmtBackupQueryFields(pStmt));
// STMT_ERR_RET(stmtBackupQueryFields(pStmt));
return TSDB_CODE_SUCCESS;
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks** pDataBlock =
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -694,19 +706,19 @@ int stmtAddBatch(TAOS_STMT* stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
if (pRsp->nBlocks <= 0) {
tscError("invalid submit resp block number %d", pRsp->nBlocks);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
size_t keyLen = 0;
STableDataBlocks **pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks *pBlock = *pIter;
char *key = taosHashGetKey(pIter, &keyLen);
STableDataBlocks* pBlock = *pIter;
char* key = taosHashGetKey(pIter, &keyLen);
STableMeta *pMeta = qGetTableMetaInDataBlock(pBlock);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
if (pMeta->uid != pStmt->bInfo.tbUid) {
tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
......@@ -717,7 +729,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
continue;
}
SSubmitBlkRsp *blkRsp = NULL;
SSubmitBlkRsp* blkRsp = NULL;
int32_t i = 0;
for (; i < pRsp->nBlocks; ++i) {
blkRsp = pRsp->pBlocks + i;
......@@ -733,7 +745,8 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
}
if (i < pRsp->nBlocks) {
tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid, blkRsp->uid);
tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
blkRsp->uid);
pMeta->uid = blkRsp->uid;
pStmt->bInfo.tbUid = blkRsp->uid;
......@@ -748,10 +761,10 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
return TSDB_CODE_SUCCESS;
}
int stmtExec(TAOS_STMT *stmt) {
int stmtExec(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
SSubmitRsp *pRsp = NULL;
SSubmitRsp* pRsp = NULL;
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
......@@ -760,7 +773,8 @@ int stmtExec(TAOS_STMT *stmt) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, NULL);
} else {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true,
(autoCreateTbl ? (void**)&pRsp : NULL));
}
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
......
......@@ -529,6 +529,18 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyNode(pStmt->pTbNamePattern);
break;
}
case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot);
taosMemoryFreeClear(pQuery->pResSchema);
if (NULL != pQuery->pCmdMsg) {
taosMemoryFreeClear(pQuery->pCmdMsg->pMsg);
taosMemoryFreeClear(pQuery->pCmdMsg);
}
taosArrayDestroy(pQuery->pDbList);
taosArrayDestroy(pQuery->pTableList);
break;
}
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
......
......@@ -39,10 +39,16 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = authenticate(pCxt, *pQuery);
}
if (TSDB_CODE_SUCCESS == code && 0 == (*pQuery)->placeholderNum) {
if (TSDB_CODE_SUCCESS == code && (*pQuery)->placeholderNum > 0) {
// TSWAP((*pQuery)->pContainPlaceholderRoot, (*pQuery)->pRoot);
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
code = translate(pCxt, *pQuery);
}
if (TSDB_CODE_SUCCESS == code && 0 == (*pQuery)->placeholderNum) {
if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, *pQuery);
}
return code;
......@@ -142,26 +148,13 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
return code;
}
void qDestroyQuery(SQuery* pQueryNode) {
if (NULL == pQueryNode) {
return;
}
nodesDestroyNode(pQueryNode->pRoot);
taosMemoryFreeClear(pQueryNode->pResSchema);
if (NULL != pQueryNode->pCmdMsg) {
taosMemoryFreeClear(pQueryNode->pCmdMsg->pMsg);
taosMemoryFreeClear(pQueryNode->pCmdMsg);
}
taosArrayDestroy(pQueryNode->pDbList);
taosArrayDestroy(pQueryNode->pTableList);
taosMemoryFreeClear(pQueryNode);
}
void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode(pQueryNode); }
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema);
}
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId) {
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) {
int32_t code = TSDB_CODE_SUCCESS;
if (colIdx < 0) {
......
......@@ -18,28 +18,6 @@
#include "planInt.h"
#include "scalar.h"
typedef struct SCollectPlaceholderValuesCxt {
int32_t errCode;
SArray* pValues;
} SCollectPlaceholderValuesCxt;
static EDealRes collectPlaceholderValuesImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_VALUE == nodeType(pNode) && ((SValueNode*)pNode)->placeholderNo > 0) {
SCollectPlaceholderValuesCxt* pCxt = pContext;
taosArrayInsert(pCxt->pValues, ((SValueNode*)pNode)->placeholderNo - 1, &pNode);
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
static int32_t collectPlaceholderValues(SPlanContext* pCxt, SQueryPlan* pPlan) {
pPlan->pPlaceholderValues = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
SCollectPlaceholderValuesCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pValues = pPlan->pPlaceholderValues};
nodesWalkPhysiPlan((SNode*)pPlan, collectPlaceholderValuesImpl, &cxt);
return cxt.errCode;
}
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
SLogicNode* pLogicNode = NULL;
SLogicSubplan* pLogicSubplan = NULL;
......@@ -58,9 +36,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
}
if (TSDB_CODE_SUCCESS == code && pCxt->placeholderNum > 0) {
code = collectPlaceholderValues(pCxt, *pPlan);
}
nodesDestroyNode(pLogicNode);
nodesDestroyNode(pLogicSubplan);
......@@ -99,249 +74,6 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream
return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
}
static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
if (pParam->is_null && 1 == *(pParam->is_null)) {
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return TSDB_CODE_SUCCESS;
}
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
pVal->node.resType.type = pParam->buffer_type;
pVal->node.resType.bytes = inputSize;
switch (pParam->buffer_type) {
case TSDB_DATA_TYPE_BOOL:
pVal->datum.b = *((bool*)pParam->buffer);
break;
case TSDB_DATA_TYPE_TINYINT:
pVal->datum.i = *((int8_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_SMALLINT:
pVal->datum.i = *((int16_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_INT:
pVal->datum.i = *((int32_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_BIGINT:
pVal->datum.i = *((int64_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_FLOAT:
pVal->datum.d = *((float*)pParam->buffer);
break;
case TSDB_DATA_TYPE_DOUBLE:
pVal->datum.d = *((double*)pParam->buffer);
break;
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return TSDB_CODE_OUT_OF_MEMORY;
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
strncpy(varDataVal(pVal->datum.p), (const char*)pParam->buffer, pVal->node.resType.bytes);
break;
case TSDB_DATA_TYPE_NCHAR: {
pVal->node.resType.bytes *= TSDB_NCHAR_SIZE;
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t output = 0;
if (!taosMbsToUcs4(pParam->buffer, inputSize, (TdUcs4*)varDataVal(pVal->datum.p), pVal->node.resType.bytes,
&output)) {
return errno;
}
varDataSetLen(pVal->datum.p, output);
pVal->node.resType.bytes = output;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
pVal->datum.i = *((int64_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_UTINYINT:
pVal->datum.u = *((uint8_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_USMALLINT:
pVal->datum.u = *((uint16_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_UINT:
pVal->datum.u = *((uint32_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_UBIGINT:
pVal->datum.u = *((uint64_t*)pParam->buffer);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
// todo
default:
break;
}
pVal->translate = true;
return TSDB_CODE_SUCCESS;
}
static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) {
int64_t queryId = *(uint64_t*)pContext;
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pNode)) {
SQueryPlan* planNode = (SQueryPlan*)pNode;
planNode->queryId = queryId;
} else if (QUERY_NODE_PHYSICAL_SUBPLAN == nodeType(pNode)) {
SSubplan* subplanNode = (SSubplan*)pNode;
subplanNode->id.queryId = queryId;
}
return DEAL_RES_CONTINUE;
}
static int32_t calcConstNode(SNode** pNode) {
if (NULL == *pNode) {
return TSDB_CODE_SUCCESS;
}
SNode* pNew = NULL;
int32_t code = scalarCalculateConstants(*pNode, &pNew);
if (TSDB_CODE_SUCCESS == code) {
*pNode = pNew;
}
return code;
}
static int32_t calcConstList(SNodeList* pList) {
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SNode* pNew = NULL;
int32_t code = scalarCalculateConstants(pNode, &pNew);
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(pNew);
} else {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static bool isEmptyResultCond(SNode** pCond) {
if (NULL == *pCond || QUERY_NODE_VALUE != nodeType(*pCond)) {
return false;
}
if (((SValueNode*)*pCond)->datum.b) {
nodesDestroyNode(*pCond);
*pCond = NULL;
return false;
}
return true;
}
static int32_t calcConstSpecificPhysiNode(SPhysiNode* pPhyNode) {
switch (nodeType(pPhyNode)) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return TSDB_CODE_SUCCESS;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return calcConstList(((SProjectPhysiNode*)pPhyNode)->pProjections);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return calcConstNode(&(((SJoinPhysiNode*)pPhyNode)->pOnConditions));
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return calcConstList(((SAggPhysiNode*)pPhyNode)->pExprs);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return calcConstList(((SSortPhysiNode*)pPhyNode)->pExprs);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return calcConstList(((SWinodwPhysiNode*)pPhyNode)->pExprs);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return calcConstList(((SPartitionPhysiNode*)pPhyNode)->pExprs);
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t calcConstSubplan(SPhysiNode* pPhyNode, bool* pEmptyResult) {
int32_t code = calcConstNode(&pPhyNode->pConditions);
if (TSDB_CODE_SUCCESS == code) {
code = calcConstSpecificPhysiNode(pPhyNode);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*pEmptyResult = isEmptyResultCond(&pPhyNode->pConditions);
if (*pEmptyResult) {
return TSDB_CODE_SUCCESS;
}
*pEmptyResult = true;
bool subEmptyResult = false;
SNode* pChild = NULL;
FOREACH(pChild, pPhyNode->pChildren) {
code = calcConstSubplan((SPhysiNode*)pChild, &subEmptyResult);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (!subEmptyResult) {
*pEmptyResult = false;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t calcConstPhysiPlan(SQueryPlan* pPlan, bool* pEmptyResult) {
*pEmptyResult = true;
bool subEmptyResult = false;
SNodeListNode* pNode = nodesListGetNode(pPlan->pSubplans, 0);
SNode* pSubplan = NULL;
FOREACH(pSubplan, pNode->pNodeList) {
int32_t code = calcConstSubplan(((SSubplan*)pSubplan)->pNode, pEmptyResult);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (!subEmptyResult) {
*pEmptyResult = false;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId,
bool* pEmptyResult) {
int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues);
int32_t code = 0;
if (colIdx < 0) {
for (int32_t i = 0; i < size; ++i) {
code = setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i);
if (code) {
return code;
}
}
} else {
code = setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams);
if (code) {
return code;
}
}
if (colIdx < 0 || ((colIdx + 1) == size)) {
nodesWalkPhysiPlan((SNode*)pPlan, updatePlanQueryId, &queryId);
code = calcConstPhysiPlan(pPlan, pEmptyResult);
}
return code;
}
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
......
......@@ -20,35 +20,49 @@ using namespace std;
class PlanStmtTest : public PlannerTestBase {
public:
void prepare(const string& sql) {
run(sql);
// todo calloc pBindParams_
}
void buildParam(TAOS_MULTI_BIND* pBindParams, int32_t index, void* pVal, int32_t type, int32_t bytes = 0) {
TAOS_MULTI_BIND* pBindParam = pBindParams + index;
pBindParam->buffer_type = type;
pBindParam->num = 1;
pBindParam->buffer_length = bytes > 0 ? bytes : tDataTypes[type].bytes;
pBindParam->buffer = taosMemoryCalloc(1, pBindParam->buffer_length);
pBindParam->length = (int32_t*)taosMemoryCalloc(1, sizeof(int32_t));
pBindParam->is_null = (char*)taosMemoryCalloc(1, sizeof(char));
*(pBindParam->length) = bytes > 0 ? bytes : tDataTypes[type].bytes;
*(pBindParam->is_null) = 0;
void bindParam(int32_t val) {
TAOS_MULTI_BIND* pBind = pBindParams_ + paramNo_++;
pBind->buffer_type = TSDB_DATA_TYPE_INT;
pBind->num = 1;
pBind->buffer_length = sizeof(int32_t);
pBind->buffer = taosMemoryCalloc(1, pBind->buffer_length);
pBind->length = (int32_t*)taosMemoryCalloc(1, sizeof(int32_t));
pBind->is_null = (char*)taosMemoryCalloc(1, sizeof(char));
*((int32_t*)pBind->buffer) = val;
*(pBind->length) = sizeof(int32_t);
*(pBind->is_null) = 0;
switch (type) {
case TSDB_DATA_TYPE_BOOL:
*((bool*)pBindParam->buffer) = *(bool*)pVal;
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t*)pBindParam->buffer) = *(int64_t*)pVal;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
default:
break;
}
void exec() {
// todo
}
private:
TAOS_MULTI_BIND* pBindParams_;
int32_t paramNo_;
};
TEST_F(PlanStmtTest, stmt) {
useDb("root", "test");
// run("select * from t1 where c1 = ?");
prepare("SELECT * FROM t1 WHERE c1 = ?");
}
......@@ -108,6 +108,57 @@ class PlannerTestBaseImpl {
}
}
void prepare(const string& sql) {
reset();
try {
doParseSql(sql, &stmtEnv_.pQuery_, true);
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
}
}
void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) {
try {
doBindParams(stmtEnv_.pQuery_, pParams, colIdx);
SPlanContext cxt = {0};
setPlanContext(stmtEnv_.pQuery_, &cxt);
SLogicNode* pLogicNode = nullptr;
doCreateLogicPlan(&cxt, &pLogicNode);
doOptimizeLogicPlan(&cxt, pLogicNode);
SLogicSubplan* pLogicSubplan = nullptr;
doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
}
}
void exec() {
try {
doParseBoundSql(stmtEnv_.pQuery_);
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
}
}
private:
struct caseEnv {
string acctId_;
......@@ -117,10 +168,15 @@ class PlannerTestBaseImpl {
struct stmtEnv {
string sql_;
array<char, 1024> msgBuf_;
SQuery* pQuery_;
~stmtEnv() { qDestroyQuery(pQuery_); }
};
struct stmtRes {
string ast_;
string prepareAst_;
string boundAst_;
string rawLogicPlan_;
string optimizedLogicPlan_;
string splitLogicPlan_;
......@@ -132,8 +188,10 @@ class PlannerTestBaseImpl {
void reset() {
stmtEnv_.sql_.clear();
stmtEnv_.msgBuf_.fill(0);
qDestroyQuery(stmtEnv_.pQuery_);
res_.ast_.clear();
res_.boundAst_.clear();
res_.rawLogicPlan_.clear();
res_.optimizedLogicPlan_.clear();
res_.splitLogicPlan_.clear();
......@@ -152,6 +210,9 @@ class PlannerTestBaseImpl {
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
cout << "syntax tree : " << endl;
cout << res_.ast_ << endl;
cout << "bound syntax tree : " << endl;
cout << res_.boundAst_ << endl;
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
......@@ -187,7 +248,7 @@ class PlannerTestBaseImpl {
}
}
void doParseSql(const string& sql, SQuery** pQuery) {
void doParseSql(const string& sql, SQuery** pQuery, bool prepare = false) {
stmtEnv_.sql_ = sql;
transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower);
......@@ -200,8 +261,32 @@ class PlannerTestBaseImpl {
cxt.msgLen = stmtEnv_.msgBuf_.max_size();
DO_WITH_THROW(qParseSql, &cxt, pQuery);
if (prepare) {
res_.prepareAst_ = toString((*pQuery)->pRoot);
} else {
res_.ast_ = toString((*pQuery)->pRoot);
}
}
void doBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) {
DO_WITH_THROW(qStmtBindParams, pQuery, pParams, colIdx);
if (colIdx < 0 || pQuery->placeholderNum == colIdx + 1) {
res_.boundAst_ = toString(pQuery->pRoot);
}
}
void doParseBoundSql(SQuery* pQuery) {
SParseContext cxt = {0};
cxt.acctId = atoi(caseEnv_.acctId_.c_str());
cxt.db = caseEnv_.db_.c_str();
cxt.pSql = stmtEnv_.sql_.c_str();
cxt.sqlLen = stmtEnv_.sql_.length();
cxt.pMsg = stmtEnv_.msgBuf_.data();
cxt.msgLen = stmtEnv_.msgBuf_.max_size();
DO_WITH_THROW(qStmtParseQuerySql, &cxt, pQuery);
res_.ast_ = toString(pQuery->pRoot);
}
void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode);
......@@ -275,3 +360,11 @@ PlannerTestBase::~PlannerTestBase() {}
void PlannerTestBase::useDb(const std::string& acctId, const std::string& db) { impl_->useDb(acctId, db); }
void PlannerTestBase::run(const std::string& sql) { return impl_->run(sql); }
void PlannerTestBase::prepare(const std::string& sql) { return impl_->prepare(sql); }
void PlannerTestBase::bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) {
return impl_->bindParams(pParams, colIdx);
}
void PlannerTestBase::exec() { return impl_->exec(); }
......@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
class PlannerTestBaseImpl;
struct TAOS_MULTI_BIND;
class PlannerTestBase : public testing::Test {
public:
......@@ -27,6 +28,10 @@ class PlannerTestBase : public testing::Test {
void useDb(const std::string& acctId, const std::string& db);
void run(const std::string& sql);
// stmt mode APIs
void prepare(const std::string& sql);
void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx);
void exec();
private:
std::unique_ptr<PlannerTestBaseImpl> impl_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册