diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0f71f24b6b646d2a68858a00331936fcf0ea581d..a4cf89cbb778034dd97b6eaf2a2fe23830103d2a 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -299,6 +299,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); uint32_t tscGetTableMetaMaxSize(); int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); +SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo); void tsCreateSQLFunctionCtx(SQueryNodeInfo* pQueryInfo, SQLFunctionCtx* pCtx); void* createQueryInfoFromQueryNode(SQueryNodeInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d7a3aad5a17561682214801ef6183bdb48f9680a..80ed19239d1fac0d4df5b21c1f4c0841f2419374 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -214,8 +214,9 @@ typedef struct SQueryNodeInfo { int32_t bufLen; char* buf; - SArray* pDSOperator; - SArray* pPhyOperator; + SArray* pDSOperator; // data source operator + SArray* pPhyOperator; // physical query execution plan + SQuery* pQuery; // query object struct SQueryNodeInfo *sibling; // sibling SArray *pUpstream; // SArray diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 3387d6258377982d98c577d53872640fd64fba9f..8e1e0bd4f1aefaf6626b48ffb2fcc642d29db53b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -126,7 +126,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index); -static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, int64_t *uid); +static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, uint64_t *uid); static bool validateDebugFlag(int32_t v); static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryNodeInfo* pQueryInfo); @@ -709,7 +709,7 @@ static bool isTopBottomQuery(SQueryNodeInfo* pQueryInfo) { // need to add timestamp column in result set, if it is a time window query static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryNodeInfo* pQueryInfo) { - uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid; + uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->base.uid; int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL; for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { @@ -1499,7 +1499,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t pArithExprInfo->base.numOfParams = 1; pArithExprInfo->base.resColId = getNewResColId(pQueryInfo); - int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &(pArithExprInfo->base.uid)); if (ret != TSDB_CODE_SUCCESS) { tExprTreeDestroy(pArithExprInfo->pExpr, NULL); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause"); @@ -3476,8 +3476,8 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryNodeInfo* p } if (i == 0) { - id = p1->uid; - } else if (id != p1->uid) { + id = p1->base.uid; + } else if (id != p1->base.uid) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -6126,7 +6126,8 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) { char tmpBuf[1024] = {0}; int32_t tmpLen = 0; tmpLen = - sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->base.functionId].name, pExpr->uid, pExpr->base.colInfo.colId); + sprintf(tmpBuf, "%s(uid:%" PRIu64 ", %d)", aAggs[pExpr->base.functionId].name, pExpr->base.uid, + pExpr->base.colInfo.colId); if (tmpLen + offset >= totalBufSize - 1) break; @@ -6842,7 +6843,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind return TSDB_CODE_SUCCESS; // Does not build query message here } -int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, int64_t *uid) { +int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, uint64_t *uid) { tExprNode* pLeft = NULL; tExprNode* pRight= NULL; @@ -6892,7 +6893,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pS (*pExpr)->pSchema->colId = p1->base.resColId; if (uid != NULL) { - *uid = p1->uid; + *uid = p1->base.uid; } break; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 18f44ee1aaaa6a341c8ab9c3260f7a20d22296a1..837dc630d3d2ba7a16e1f0f561bea2b0197fbebc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -705,9 +705,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryNodeInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQuery* pQuery = tscCreateQueryFromQueryNodeInfo(pQueryInfo); + UNUSED(pQuery); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - +/* size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList); if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) { tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols, @@ -725,6 +728,311 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); return TSDB_CODE_TSC_INVALID_SQL; } +*/ + + { + SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; + tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); + + int32_t numOfTags = pQuery->numOfTags; + int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); + + if (pQuery->order.order == TSDB_ORDER_ASC) { + pQueryMsg->window.skey = htobe64(pQuery->window.skey); + pQueryMsg->window.ekey = htobe64(pQuery->window.ekey); + } else { + pQueryMsg->window.skey = htobe64(pQuery->window.ekey); + pQueryMsg->window.ekey = htobe64(pQuery->window.skey); + } + + pQueryMsg->order = htons(pQuery->order.order); + pQueryMsg->orderColId = htons(pQuery->order.orderColId); + pQueryMsg->fillType = htons(pQuery->fillType); + pQueryMsg->limit = htobe64(pQuery->limit.limit); + pQueryMsg->offset = htobe64(pQuery->limit.offset); + + pQueryMsg->numOfCols = htons(pQuery->numOfCols); + + pQueryMsg->interval.interval = htobe64(pQuery->interval.interval); + pQueryMsg->interval.sliding = htobe64(pQuery->interval.sliding); + pQueryMsg->interval.offset = htobe64(pQuery->interval.offset); + pQueryMsg->interval.intervalUnit = pQuery->interval.intervalUnit; + pQueryMsg->interval.slidingUnit = pQuery->interval.slidingUnit; + pQueryMsg->interval.offsetUnit = pQuery->interval.offsetUnit; + + pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); + pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); + pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len); + pQueryMsg->numOfTags = htonl(numOfTags); + pQueryMsg->queryType = htonl(pQueryInfo->type); +// pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); + pQueryMsg->sqlstrLen = htonl(sqlLen); + pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); + pQueryMsg->sw.gap = htobe64(pQuery->sw.gap); + pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); + + pQueryMsg->numOfOutput = htons((int16_t)pQuery->numOfOutput); // this is the stage one output column number + + // set column list ids + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); + SSchema *pSchema = tscGetTableSchema(pTableMeta); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfo *pCol = &pQuery->colList[i]; + + pQueryMsg->colList[i].colId = htons(pCol->colId); + pQueryMsg->colList[i].bytes = htons(pCol->bytes); + pQueryMsg->colList[i].type = htons(pCol->type); + pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters); + + // append the filter information after the basic column information + for (int32_t f = 0; f < pCol->numOfFilters; ++f) { + SColumnFilterInfo *pColFilter = &pCol->filterInfo[f]; + + SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; + pFilterMsg->filterstr = htons(pColFilter->filterstr); + + pMsg += sizeof(SColumnFilterInfo); + + if (pColFilter->filterstr) { + pFilterMsg->len = htobe64(pColFilter->len); + memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1)); + pMsg += (pColFilter->len + 1); // append the additional filter binary info + } else { + pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi); + pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi); + } + + pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr); + pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr); + + if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) { + tscError("invalid filter info"); + return TSDB_CODE_TSC_INVALID_SQL; + } + } + } + + SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlExpr *pExpr = &pQuery->pExpr1[i].base; + + // the queried table has been removed and a new table with the same name has already been created already + // return error msg + if (pExpr->uid != pTableMeta->id.uid) { + tscError("%p table has already been destroyed", pSql); + return TSDB_CODE_TSC_INVALID_TABLE_NAME; + } + + if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { + tscError("%p table schema is not matched with parsed sql", pSql); + return TSDB_CODE_TSC_INVALID_SQL; + } + + assert(pExpr->resColId < 0); + + pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); + pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); + pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); + + pSqlExpr->colType = htons(pExpr->colType); + pSqlExpr->colBytes = htons(pExpr->colBytes); + pSqlExpr->resType = htons(pExpr->resType); + pSqlExpr->resBytes = htons(pExpr->resBytes); + + pSqlExpr->functionId = htons(pExpr->functionId); + pSqlExpr->numOfParams = htons(pExpr->numOfParams); + pSqlExpr->resColId = htons(pExpr->resColId); + pMsg += sizeof(SSqlExpr); + + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log + pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); + pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); + + if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { + memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); + pMsg += pExpr->param[j].nLen; + } else { + pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64); + } + } + + pSqlExpr = (SSqlExpr *)pMsg; + } + + if (pQuery->numOfExpr2 > 0) { + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + SSqlExpr *pExpr = &pQuery->pExpr2[i].base; + + // the queried table has been removed and a new table with the same name has already been created already + // return error msg + if (pExpr->uid != pTableMeta->id.uid) { + tscError("%p table has already been destroyed", pSql); + return TSDB_CODE_TSC_INVALID_TABLE_NAME; + } + + if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { + tscError("%p table schema is not matched with parsed sql", pSql); + return TSDB_CODE_TSC_INVALID_SQL; + } + + assert(pExpr->resColId < 0); + + pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); + pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); + pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); + + pSqlExpr->colType = htons(pExpr->colType); + pSqlExpr->colBytes = htons(pExpr->colBytes); + pSqlExpr->resType = htons(pExpr->resType); + pSqlExpr->resBytes = htons(pExpr->resBytes); + + pSqlExpr->functionId = htons(pExpr->functionId); + pSqlExpr->numOfParams = htons(pExpr->numOfParams); + pSqlExpr->resColId = htons(pExpr->resColId); + pMsg += sizeof(SSqlExpr); + + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log + pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); + pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); + + if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { + memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); + pMsg += pExpr->param[j].nLen; + } else { + pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64); + } + } + + pSqlExpr = (SSqlExpr *)pMsg; + } + } else { + pQueryMsg->secondStageOutput = 0; + } + + // serialize the table info (sid, uid, tags) + pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); + + SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; + if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) { + pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); + pQueryMsg->orderType = htons(pGroupbyExpr->orderType); + + for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { + SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); + + *((int16_t *)pMsg) = htons(pCol->colId); + pMsg += sizeof(pCol->colId); + + *((int16_t *)pMsg) += htons(pCol->colIndex); + pMsg += sizeof(pCol->colIndex); + + *((int16_t *)pMsg) += htons(pCol->flag); + pMsg += sizeof(pCol->flag); + + memcpy(pMsg, pCol->name, tListLen(pCol->name)); + pMsg += tListLen(pCol->name); + } + } + + if (pQuery->fillType != TSDB_FILL_NONE) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + *((int64_t *)pMsg) = htobe64(pQuery->fillVal[i]); + pMsg += sizeof(pQuery->fillVal[0]); + } + } + + if (numOfTags != 0) { + int32_t numOfColumns = tscGetNumOfColumns(pTableMeta); + int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); + int32_t total = numOfTagColumns + numOfColumns; + + pSchema = tscGetTableTagSchema(pTableMeta); + + for (int32_t i = 0; i < numOfTags; ++i) { + SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); + SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; + + if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || + (!isValidDataType(pColSchema->type))) { + char n[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, n); + + tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", + pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name); + + return TSDB_CODE_TSC_INVALID_SQL; + } + + SColumnInfo* pTagCol = (SColumnInfo*) pMsg; + + pTagCol->colId = htons(pColSchema->colId); + pTagCol->bytes = htons(pColSchema->bytes); + pTagCol->type = htons(pColSchema->type); + pTagCol->numOfFilters = 0; + + pMsg += sizeof(SColumnInfo); + } + } + + // serialize tag column query condition + if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { + STagCond* pTagCond = &pQueryInfo->tagCond; + + SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); + if (pCond != NULL && pCond->cond != NULL) { + pQueryMsg->tagCondLen = htons(pCond->len); + memcpy(pMsg, pCond->cond, pCond->len); + + pMsg += pCond->len; + } + } + + if (pQueryInfo->bufLen > 0) { + memcpy(pMsg, pQueryInfo->buf, pQueryInfo->bufLen); + pMsg += pQueryInfo->bufLen; + } + + SCond* pCond = &pQueryInfo->tagCond.tbnameCond; + if (pCond->len > 0) { + strncpy(pMsg, pCond->cond, pCond->len); + pMsg += pCond->len; + } + + // compressed ts block + pQueryMsg->tsBuf.tsOffset = htonl((int32_t)(pMsg - pCmd->payload)); + + if (pQueryInfo->tsBuf != NULL) { + // note: here used the index instead of actual vnode id. + int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; + int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + pMsg += pQueryMsg->tsBuf.tsLen; + + pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); + pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); + pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); + } + + memcpy(pMsg, pSql->sqlstr, sqlLen); + pMsg += sqlLen; + + int32_t msgLen = (int32_t)(pMsg - pCmd->payload); + + tscDebug("%p msg built success, len:%d bytes", pSql, msgLen); + pCmd->payloadLen = msgLen; + pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; + + pQueryMsg->head.contLen = htonl(msgLen); + assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); + + return TSDB_CODE_SUCCESS; + } SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); @@ -818,14 +1126,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - SSqlExpr *pSqlFuncExpr = (SSqlExpr *)pMsg; + SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); // the queried table has been removed and a new table with the same name has already been created already // return error msg - if (pExpr->uid != pTableMeta->id.uid) { + if (pExpr->base.uid != pTableMeta->id.uid) { tscError("%p table has already been destroyed", pSql); return TSDB_CODE_TSC_INVALID_TABLE_NAME; } @@ -837,45 +1145,45 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(pExpr->base.resColId < 0); - pSqlFuncExpr->colInfo.colId = htons(pExpr->base.colInfo.colId); - pSqlFuncExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex); - pSqlFuncExpr->colInfo.flag = htons(pExpr->base.colInfo.flag); + pSqlExpr->colInfo.colId = htons(pExpr->base.colInfo.colId); + pSqlExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex); + pSqlExpr->colInfo.flag = htons(pExpr->base.colInfo.flag); - pSqlFuncExpr->colType = htons(pExpr->base.colType); - pSqlFuncExpr->colBytes = htons(pExpr->base.colBytes); + pSqlExpr->colType = htons(pExpr->base.colType); + pSqlExpr->colBytes = htons(pExpr->base.colBytes); if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag) || pExpr->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - pSqlFuncExpr->resType = htons(pExpr->base.resType); - pSqlFuncExpr->resBytes = htons(pExpr->base.resBytes); + pSqlExpr->resType = htons(pExpr->base.resType); + pSqlExpr->resBytes = htons(pExpr->base.resBytes); } else if (pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { SSchema s = tGetBlockDistColumnSchema(); - pSqlFuncExpr->resType = htons(s.type); - pSqlFuncExpr->resBytes = htons(s.bytes); + pSqlExpr->resType = htons(s.type); + pSqlExpr->resBytes = htons(s.bytes); } else { SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->base.colInfo.colId); - pSqlFuncExpr->resType = htons(s->type); - pSqlFuncExpr->resBytes = htons(s->bytes); + pSqlExpr->resType = htons(s->type); + pSqlExpr->resBytes = htons(s->bytes); } - pSqlFuncExpr->functionId = htons(pExpr->base.functionId); - pSqlFuncExpr->numOfParams = htons(pExpr->base.numOfParams); - pSqlFuncExpr->resColId = htons(pExpr->base.resColId); + pSqlExpr->functionId = htons(pExpr->base.functionId); + pSqlExpr->numOfParams = htons(pExpr->base.numOfParams); + pSqlExpr->resColId = htons(pExpr->base.resColId); pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { // todo add log - pSqlFuncExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType); - pSqlFuncExpr->param[j].nLen = htons(pExpr->base.param[j].nLen); + pSqlExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType); + pSqlExpr->param[j].nLen = htons(pExpr->base.param[j].nLen); if (pExpr->base.param[j].nType == TSDB_DATA_TYPE_BINARY) { memcpy(pMsg, pExpr->base.param[j].pz, pExpr->base.param[j].nLen); pMsg += pExpr->base.param[j].nLen; } else { - pSqlFuncExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64); + pSqlExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64); } } - pSqlFuncExpr = (SSqlExpr *)pMsg; + pSqlExpr = (SSqlExpr *)pMsg; } size_t output = tscNumOfFields(pQueryInfo); @@ -893,7 +1201,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (pExpr->pExpr == NULL) { // the queried table has been removed and a new table with the same name has already been created already // return error msg - if (pExpr->uid != pTableMeta->id.uid) { + if (pExpr->base.uid != pTableMeta->id.uid) { tscError("%p table has already been destroyed", pSql); return TSDB_CODE_TSC_INVALID_TABLE_NAME; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index a3ca4dfb54545139f3dbf059a41609bf4da7976a..e7b1f36cfbd49cb657f8dd6de574178397a3a1a3 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1463,7 +1463,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { int32_t tableIndexOfSub = -1; for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j); - if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) { + if (pTableMetaInfo->pTableMeta->id.uid == pExpr->base.uid) { tableIndexOfSub = j; break; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a6375e9b86aafe241aff0ece53465c7bb9972601..1c9fe776a7361457b26e5446ac20ce4e9807c3f8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1291,7 +1291,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { } static SExprInfo* doCreateSqlExpr(SQueryNodeInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, - int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { + int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); SExprInfo* pExpr = calloc(1, sizeof(SExprInfo)); @@ -1344,7 +1344,7 @@ static SExprInfo* doCreateSqlExpr(SQueryNodeInfo* pQueryInfo, int16_t functionId p->interBytes = interSize; if (pTableMetaInfo->pTableMeta) { - pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; + p->uid = pTableMetaInfo->pTableMeta->id.uid; } return pExpr; @@ -1460,8 +1460,7 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco for (int32_t i = 0; i < size; ++i) { SExprInfo* pExpr = taosArrayGetP(src, i); - if (pExpr->uid == uid) { - + if (pExpr->base.uid == uid) { if (deepcopy) { SExprInfo* p1 = calloc(1, sizeof(SExprInfo)); tscSqlExprAssign(p1, pExpr); @@ -1983,7 +1982,6 @@ void tscInitQueryInfo(SQueryNodeInfo* pQueryInfo) { pQueryInfo->slimit.offset = 0; pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); - } int32_t tscAddQueryInfo(SSqlCmd* pCmd) { @@ -3103,7 +3101,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) { pQuery->vgId = 0; pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); pQuery->groupbyColumn = tscGroupbyColumn(pQueryNodeInfo); - pQuery->interBufSize = getOutputInterResultBufSize(pQuery); + pQuery->window = pQueryNodeInfo->window; { pQuery->numOfOutput = tscSqlExprNumOfExprs(pQueryNodeInfo); @@ -3113,6 +3111,12 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) { SExprInfo* pExpr = tscSqlExprGet(pQueryNodeInfo, i); tscSqlExprAssign(&pQuery->pExpr1[i], pExpr); } + + pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo)); + for(int32_t i = 0; i < numOfCols; ++i) { + SColumn* pCol = taosArrayGetP(pQueryNodeInfo->colList, i); + pQuery->colList[i] = pCol->info; + } } {// for simple table, not for super table @@ -3154,6 +3158,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) { } } + pQuery->interBufSize = getOutputInterResultBufSize(pQuery); return pQuery; } diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index fb57565f84261118e05c11b3276f420ed893ad1e..34bcbf44305baa8c8adc227a1e58e01f52a6e82c 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -64,7 +64,6 @@ typedef struct SSqlExpr { typedef struct SExprInfo { SSqlExpr base; - int64_t uid; struct tExprNode* pExpr; } SExprInfo; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2f302ab0e5be158d8edcccebe6cd56b33961fb21..dd928980b0c0ac01ce0c4ec0dba27595155394cb 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -397,28 +397,6 @@ typedef struct SColIndex { char name[TSDB_COL_NAME_LEN]; // TODO remove it } SColIndex; -/* sql function msg, to describe the message to vnode about sql function - * operations in select */ -//typedef struct SSqlFuncMsg { -// int16_t functionId; -// int16_t numOfParams; -// -// int16_t resColId; // result column id, id of the current output column -// int16_t colType; -// int16_t colBytes; -// -// SColIndex colInfo; -// struct ArgElem { -// int16_t argType; -// int16_t argBytes; -// union { -// double d; -// int64_t i64; -// char * pz; -// } argValue; -// } arg[3]; -//} SSqlFuncMsg; - typedef struct SColumnFilterInfo { int16_t lowerRelOptr; int16_t upperRelOptr; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b4c3464131f204dfb39ae00e0c4d8cfde3508716..1ff58338d068adccbda163ffefbb00aa764034f7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5869,7 +5869,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp int32_t param = (int32_t)pExprs[i].base.param[0].i64; if (pExprs[i].base.functionId != TSDB_FUNC_ARITHM && - (type != pExprs[i].base.resType || bytes != pExprs[i].base.resBytes)) { + (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { tfree(pExprs); return TSDB_CODE_QRY_INVALID_MSG; }