提交 1850dc03 编写于 作者: D dapan1121

stmt query

上级 c7464a7c
......@@ -50,7 +50,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx);
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId);
// Convert to subplan to string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
......
......@@ -46,6 +46,12 @@ typedef struct SStmtTableCache {
void* boundTags;
} SStmtTableCache;
typedef struct SQueryFields {
TAOS_FIELD* fields;
TAOS_FIELD* userFields;
uint32_t numOfCols;
} SQueryFields;
typedef struct SStmtBindInfo {
bool needParse;
uint64_t tbUid;
......@@ -66,16 +72,17 @@ typedef struct SStmtExecInfo {
} SStmtExecInfo;
typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sqlStr;
int32_t sqlLen;
SArray* nodeList;
SQueryPlan* pQueryPlan;
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sqlStr;
int32_t sqlLen;
SArray* nodeList;
SQueryPlan* pQueryPlan;
SQueryFields fields;
} SStmtSQLInfo;
typedef struct STscStmt {
......
......@@ -73,6 +73,22 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
SQueryFields *pFields = &pStmt->sql.fields;
int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD);
pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
pFields->fields = taosMemoryMalloc(size);
pFields->userFields = taosMemoryMalloc(size);
if (NULL == pFields->fields || NULL == pFields->userFields) {
STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -258,37 +274,42 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
if (pTableMeta->uid == pStmt->bInfo.tbUid) {
uint64_t uid = pTableMeta->uid;
uint64_t suid = pTableMeta->suid;
int8_t tableType = pTableMeta->tableType;
taosMemoryFree(pTableMeta);
if (uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
if (NULL == pCache) {
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.tbUid = uid;
pStmt->bInfo.tbSuid = suid;
pStmt->bInfo.tbType = tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
return TSDB_CODE_SUCCESS;
}
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
if (pCache) {
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.tbUid = uid;
pStmt->bInfo.tbSuid = suid;
pStmt->bInfo.tbType = tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
STableDataBlocks* pNewBlock = NULL;
......@@ -475,9 +496,10 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
pStmt->exec.pRequest->body.pDag = NULL;
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
}
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx));
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
......@@ -549,6 +571,8 @@ int stmtClose(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_RET(stmtCleanSQLInfo(pStmt));
taosMemoryFree(stmt);
}
const char *stmtErrstr(TAOS_STMT *stmt) {
......
......@@ -1521,8 +1521,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow1DataRow) {
colIdOfRow1 = pSchema1->columns[j].colId;
} else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j);
colIdOfRow1 = pColIdx->colId;
colIdOfRow1 = tdKvRowColIdAt(row1, j);
}
int32_t colIdOfRow2;
......@@ -1531,8 +1530,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow2DataRow) {
colIdOfRow2 = pSchema2->columns[k].colId;
} else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k);
colIdOfRow2 = pColIdx->colId;
colIdOfRow2 = tdKvRowColIdAt(row2, j);
}
if (colIdOfRow1 == colIdOfRow2) {
......
......@@ -374,6 +374,11 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d
CHECK_OUT_OF_MEM(func);
strcpy(func->functionName, "cast");
func->node.resType = dt;
if (TSDB_DATA_TYPE_BINARY == dt.type) {
func->node.resType.bytes += 2;
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2;
}
nodesListMakeAppend(&func->pParameterList, pExpr);
return (SNode*)func;
}
......
......@@ -165,15 +165,35 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
return TSDB_CODE_SUCCESS;
}
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx) {
static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) {
int64_t queryId = *(uint64_t *)pContext;
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pNode)) {
SQueryPlan* planNode = (SQueryPlan*)pNode;
planNode->queryId = queryId;
} else if (QUERY_NODE_PHYSICAL_SUBPLAN == nodeType(pNode)) {
SSubplan* subplanNode = (SSubplan*)pNode;
subplanNode->id.queryId = queryId;
}
return DEAL_RES_CONTINUE;
}
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId) {
int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues);
if (colIdx < 0) {
int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues);
for (int32_t i = 0; i < size; ++i) {
setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i);
}
} else {
setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams);
}
if (colIdx < 0 || ((colIdx + 1) == size)) {
nodesWalkPhysiPlan((SNode*)pPlan, updatePlanQueryId, &queryId);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -590,7 +590,10 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
if (colDataIsNull_s(output.columnData, 0)) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
} else {
res->node.resType = node->node.resType;
res->node.resType.type = output.columnData->info.type;
res->node.resType.bytes = output.columnData->info.bytes;
res->node.resType.scale = output.columnData->info.scale;
res->node.resType.precision = output.columnData->info.precision;
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
......
......@@ -648,11 +648,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
if (IS_VAR_DATA_TYPE(outputType)) {
int32_t factor = (TSDB_DATA_TYPE_NCHAR == outputType) ? TSDB_NCHAR_SIZE : 1;
outputLen = outputLen * factor + VARSTR_HEADER_SIZE;
}
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
char *output = outputBuf;
......
......@@ -168,7 +168,7 @@ typedef struct {
int32_t caseRunNum; // total run case num
} CaseCtrl;
#if 0
#if 1
CaseCtrl gCaseCtrl = {
.bindNullNum = 0,
.prepareStb = false,
......@@ -190,8 +190,7 @@ CaseCtrl gCaseCtrl = {
.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList,
// .caseIdx = 22,
.caseIdx = 2,
.caseIdx = 22,
.caseNum = 1,
.caseRunNum = 1,
......@@ -211,10 +210,10 @@ CaseCtrl gCaseCtrl = {
.checkParamNum = false,
.printRes = true,
.runTimes = 0,
.caseIdx = 2,
.caseNum = 1,
.caseIdx = -1,
.caseNum = -1,
.caseRunIdx = -1,
.caseRunNum = 1,
.caseRunNum = -1,
};
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册