From 7d9ad2e9d4cbf9a790be3d7bb7391d5950da4f6b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Jan 2021 15:09:47 +0800 Subject: [PATCH] [TD-225]1. add sql string in qhandle, 2. enable column check during convert query message. --- src/client/src/tscServer.c | 24 ++++++++---- src/client/src/tscUtil.c | 2 +- src/inc/taosmsg.h | 1 + src/query/inc/qExecutor.h | 1 + src/query/src/qExecutor.c | 77 ++++++++++++++++++++++++-------------- 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7d5d71bac3..b1050a9bff 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -564,7 +564,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { /* * for table query, simply return the size <= 1k */ -static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { +static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { + SSqlCmd* pCmd = &pSql->cmd; + const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); @@ -574,6 +576,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2); int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; + int32_t sqlLen = strlen(pSql->sqlstr) + 1; int32_t tableSerialize = 0; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -590,7 +593,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { } return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize + - tableSerialize + 4096; + tableSerialize + sqlLen + 4096; } static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { @@ -670,7 +673,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex); + int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for query msg", pSql); @@ -703,7 +706,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); - + int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); @@ -726,10 +730,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); + pQueryMsg->tagCondLen = htons((pQueryInfo->tagCond.tbnameCond.cond != NULL)? strlen(pQueryInfo->tagCond.tbnameCond.cond):0); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->queryType = htonl(pQueryInfo->type); - pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); - + pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); + pQueryMsg->sqlstrLen = htonl(sqlLen); + size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number @@ -964,8 +970,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } if (pQueryInfo->tagCond.tbnameCond.cond == NULL) { - *pMsg = 0; - pMsg++; + assert(pQueryMsg->tagCondLen == 0); } else { strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond); pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1; @@ -989,6 +994,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); } + memcpy(pMsg, pSql->sqlstr, sqlLen); + pMsg += sqlLen; + int32_t msgLen = (int32_t)(pMsg - pCmd->payload); tscDebug("%p msg built success, len:%d bytes", pSql, msgLen); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0753f95dd2..2bd97dce9f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1995,7 +1995,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->sqlstr = NULL; + pNew->sqlstr = strdup(pSql->sqlstr); SSqlCmd* pnCmd = &pNew->cmd; memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 3e696f645c..79cfc6f86c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -494,6 +494,7 @@ typedef struct { int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsOrder; // ts comp block order int32_t numOfTags; // number of tags columns involved + int32_t sqlstrLen; // sql query string SColumnInfo colList[]; } SQueryTableMsg; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index e41217af6e..205d857f3f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -239,6 +239,7 @@ typedef struct SQInfo { int32_t dataReady; // denote if query result is ready or not void* rspContext; // response context int64_t startExecTs; // start to exec timestamp + char* sql; // query sql string } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 89060bbf4b..291f7ca337 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5911,13 +5911,13 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE j += 1; } } - assert(0); - return -1; + + return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value } bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg, pTagCols); - return j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags; + return j != INT32_MIN; } static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { @@ -5944,12 +5944,14 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { return true; } -static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pExprMsg) { +static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) { int32_t numOfTotal = pQueryMsg->numOfCols + pQueryMsg->numOfTags; if (pQueryMsg->numOfCols < 0 || pQueryMsg->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pQueryMsg, pQueryMsg->numOfCols, pQueryMsg->numOfTags); return false; - } else if (numOfTotal == 0) { + } + + if (numOfTotal == 0) { for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { SSqlFuncMsg* pFuncMsg = pExprMsg[i]; @@ -5963,6 +5965,12 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx } } + for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { + if (!validateExprColumnInfo(pQueryMsg, pExprMsg[i], pTagCols)) { + return TSDB_CODE_QRY_INVALID_MSG; + } + } + return true; } @@ -5994,7 +6002,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p * @return */ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, SSqlFuncMsg ***pSecStageExpr, - char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) { + char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols, char** sql) { int32_t code = TSDB_CODE_SUCCESS; if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) { @@ -6026,7 +6034,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags); + pQueryMsg->tagCondLen = htonl(pQueryMsg->tagCondLen); pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput); + pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen); // query msg safety check if (!validateQueryMsg(pQueryMsg)) { @@ -6121,20 +6131,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, code = TSDB_CODE_QRY_INVALID_MSG; goto _cleanup; } - } else { -// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { -// return TSDB_CODE_QRY_INVALID_MSG; -// } } pExprMsg = (SSqlFuncMsg *)pMsg; } - if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { - code = TSDB_CODE_QRY_INVALID_MSG; - goto _cleanup; - } - if (pQueryMsg->secondStageOutput) { pExprMsg = (SSqlFuncMsg *)pMsg; *pSecStageExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES); @@ -6168,10 +6169,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, code = TSDB_CODE_QRY_INVALID_MSG; goto _cleanup; } - } else { -// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { -// return TSDB_CODE_QRY_INVALID_MSG; -// } } pExprMsg = (SSqlFuncMsg *)pMsg; @@ -6250,17 +6247,22 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pMsg += pQueryMsg->tagCondLen; } - if (*pMsg != 0) { - size_t len = strlen(pMsg) + 1; - - *tbnameCond = malloc(len); + if (pQueryMsg->tagCondLen != 0) { + *tbnameCond = calloc(1, pQueryMsg->tagCondLen + 1); if (*tbnameCond == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _cleanup; } - strcpy(*tbnameCond, pMsg); - pMsg += len; + strncpy(*tbnameCond, pMsg, pQueryMsg->tagCondLen); + pMsg += pQueryMsg->tagCondLen; + } + + *sql = strndup(pMsg, pQueryMsg->sqlstrLen); + + if (!validateQuerySourceCols(pQueryMsg, *pExpr, *tagCols)) { + code = TSDB_CODE_QRY_INVALID_MSG; + goto _cleanup; } qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " @@ -6269,6 +6271,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->interval.interval, pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); + qDebug("qmsg:%p, sql:%s", pQueryMsg, *sql); return TSDB_CODE_SUCCESS; _cleanup: @@ -6279,6 +6282,7 @@ _cleanup: tfree(*groupbyCols); tfree(*tagCols); tfree(*tagCond); + tfree(*sql); return code; } @@ -6351,7 +6355,15 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num } } else { int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags); + if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) { + if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pQueryMsg->numOfTags) { + return TSDB_CODE_QRY_INVALID_MSG; + } + } else { + if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) { + return TSDB_CODE_QRY_INVALID_MSG; + } + } if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; @@ -6375,6 +6387,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pExprs[i].bytes; } + assert(isValidDataType(pExprs[i].type)); } @@ -6577,7 +6590,7 @@ static void calResultBufSize(SQuery* pQuery) { } static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) { + SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -6708,6 +6721,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->rspContext = NULL; + pQInfo->sql = sql; pthread_mutex_init(&pQInfo->lock, NULL); tsem_init(&pQInfo->ready, 0, 0); @@ -6951,6 +6965,8 @@ static void freeQInfo(SQInfo *pQInfo) { doDestroyTableQueryInfo(&pQInfo->tableqinfoGroupInfo); tfree(pQInfo->pBuf); + tfree(pQInfo->sql); + tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); taosHashCleanup(pQInfo->arrTableIdInfo); @@ -7049,6 +7065,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi int32_t code = TSDB_CODE_SUCCESS; + char *sql = NULL; char *tagCond = NULL; char *tbnameCond = NULL; SArray *pTableIdList = NULL; @@ -7061,7 +7078,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi SColumnInfo *pTagColumnInfo = NULL; SSqlGroupbyExpr *pGroupbyExpr = NULL; - code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo); + code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo, &sql); if (code != TSDB_CODE_SUCCESS) { goto _over; } @@ -7145,8 +7162,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery, sql); + sql = NULL; pExprs = NULL; pSecExprs = NULL; pGroupbyExpr = NULL; @@ -7170,6 +7188,7 @@ _over: } free(pTagColumnInfo); + free(sql); free(pExprs); free(pSecExprs); -- GitLab