diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0bf25a3e5f40d1185747313e8e4d8a585bf8c843..4fcb6914c705957f38c69c47435e4364d835ec44 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -692,6 +692,54 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char return pMsg; } +static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, char** pMsg, void* addr) { + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + + // the queried table has been removed and a new table with the same name has already been created already + // return error msg + if (pExpr->uid != pTableMeta->id.uid) { + tscError("%p table has already been destroyed", addr); + return TSDB_CODE_TSC_INVALID_TABLE_NAME; + } + + if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { + tscError("%p table schema is not matched with parsed sql", addr); + return TSDB_CODE_TSC_INVALID_SQL; + } + + assert(pExpr->resColId < 0); + SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg); + + SColIndex* pIndex = &pSqlExpr->colInfo; + + pIndex->colId = htons(pExpr->colInfo.colId); + pIndex->colIndex = htons(pExpr->colInfo.colIndex); + pIndex->flag = htons(pExpr->colInfo.flag); + pSqlExpr->uid = htobe64(pExpr->uid); + pSqlExpr->colType = htons(pExpr->colType); + pSqlExpr->colBytes = htons(pExpr->colBytes); + pSqlExpr->resType = htons(pExpr->resType); + pSqlExpr->resBytes = htons(pExpr->resBytes); + pSqlExpr->functionId = htons(pExpr->functionId); + pSqlExpr->numOfParams = htons(pExpr->numOfParams); + pSqlExpr->resColId = htons(pExpr->resColId); + + (*pMsg) += sizeof(SSqlExpr); + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log + pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); + pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); + + if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { + memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen); + (*pMsg) += pExpr->param[j].nLen; + } else { + pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64); + } + } + + return TSDB_CODE_SUCCESS; +} + int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; @@ -703,386 +751,92 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + SQueryAttr query = {0}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); + SArray* tableScanOperator = createTableScanPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - { - SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; - tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); - - int32_t numOfTags = query.numOfTags; - int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); - - if (taosArrayGetSize(tableScanOperator) == 0) { - pQueryMsg->tableScanOperator = htonl(-1); - } else { - int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0); - pQueryMsg->tableScanOperator = htonl(*tablescanOp); - } - - if (query.order.order == TSDB_ORDER_ASC) { - pQueryMsg->window.skey = htobe64(query.window.skey); - pQueryMsg->window.ekey = htobe64(query.window.ekey); - } else { - pQueryMsg->window.skey = htobe64(query.window.ekey); - pQueryMsg->window.ekey = htobe64(query.window.skey); - } - - pQueryMsg->order = htons(query.order.order); - pQueryMsg->orderColId = htons(query.order.orderColId); - pQueryMsg->fillType = htons(query.fillType); - pQueryMsg->limit = htobe64(query.limit.limit); - pQueryMsg->offset = htobe64(query.limit.offset); - pQueryMsg->numOfCols = htons(query.numOfCols); - - pQueryMsg->interval.interval = htobe64(query.interval.interval); - pQueryMsg->interval.sliding = htobe64(query.interval.sliding); - pQueryMsg->interval.offset = htobe64(query.interval.offset); - pQueryMsg->interval.intervalUnit = query.interval.intervalUnit; - pQueryMsg->interval.slidingUnit = query.interval.slidingUnit; - pQueryMsg->interval.offsetUnit = query.interval.offsetUnit; - - pQueryMsg->stableQuery = query.stableQuery; - pQueryMsg->topBotQuery = query.topBotQuery; - pQueryMsg->groupbyColumn = query.groupbyColumn; - pQueryMsg->hasTagResults = query.hasTagResults; - pQueryMsg->timeWindowInterpo = query.timeWindowInterpo; - pQueryMsg->queryBlockDist = query.queryBlockDist; - pQueryMsg->stabledev = query.stabledev; - pQueryMsg->tsCompQuery = query.tsCompQuery; - pQueryMsg->simpleAgg = query.simpleAgg; - pQueryMsg->pointInterpQuery = query.pointInterpQuery; - pQueryMsg->needReverseScan = query.needReverseScan; - - pQueryMsg->numOfTags = htonl(numOfTags); - pQueryMsg->sqlstrLen = htonl(sqlLen); - pQueryMsg->sw.gap = htobe64(query.sw.gap); - pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); - - pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); - pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number - - pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); - pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); - pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len); - pQueryMsg->queryType = htonl(pQueryInfo->type); - pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); - - // set column list ids - size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); - char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfo *pCol = &query.colList[i]; - - pQueryMsg->colList[i].colId = htons(pCol->colId); - pQueryMsg->colList[i].bytes = htons(pCol->bytes); - pQueryMsg->colList[i].type = htons(pCol->type); - pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters); - - // append the filter information after the basic column information - for (int32_t f = 0; f < pCol->numOfFilters; ++f) { - SColumnFilterInfo *pColFilter = &pCol->filterInfo[f]; - - SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; - pFilterMsg->filterstr = htons(pColFilter->filterstr); - - pMsg += sizeof(SColumnFilterInfo); - - if (pColFilter->filterstr) { - pFilterMsg->len = htobe64(pColFilter->len); - memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1)); - pMsg += (pColFilter->len + 1); // append the additional filter binary info - } else { - pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi); - pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi); - } - - pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr); - pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr); - - if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) { - tscError("invalid filter info"); - return TSDB_CODE_TSC_INVALID_SQL; - } - } - } - - SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; - - for (int32_t i = 0; i < query.numOfOutput; ++i) { - SSqlExpr *pExpr = &query.pExpr1[i].base; - - // the queried table has been removed and a new table with the same name has already been created already - // return error msg - if (pExpr->uid != pTableMeta->id.uid) { - tscError("%p table has already been destroyed", pSql); - return TSDB_CODE_TSC_INVALID_TABLE_NAME; - } - - if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { - tscError("%p table schema is not matched with parsed sql", pSql); - return TSDB_CODE_TSC_INVALID_SQL; - } - - assert(pExpr->resColId < 0); - - pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); - pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); - pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); - pSqlExpr->uid = htobe64(pExpr->uid); - pSqlExpr->colType = htons(pExpr->colType); - pSqlExpr->colBytes = htons(pExpr->colBytes); - pSqlExpr->resType = htons(pExpr->resType); - pSqlExpr->resBytes = htons(pExpr->resBytes); - pSqlExpr->functionId = htons(pExpr->functionId); - pSqlExpr->numOfParams = htons(pExpr->numOfParams); - pSqlExpr->resColId = htons(pExpr->resColId); - - pMsg += sizeof(SSqlExpr); - for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log - pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); - pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); - - if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); - pMsg += pExpr->param[j].nLen; - } else { - pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64); - } - } - - pSqlExpr = (SSqlExpr *)pMsg; - } - - if (query.numOfExpr2 > 0) { - for (int32_t i = 0; i < query.numOfExpr2; ++i) { - SSqlExpr *pExpr = &query.pExpr2[i].base; - - // the queried table has been removed and a new table with the same name has already been created already - // return error msg - if (pExpr->uid != pTableMeta->id.uid) { - tscError("%p table has already been destroyed", pSql); - return TSDB_CODE_TSC_INVALID_TABLE_NAME; - } - - assert(pExpr->resColId < 0); - - pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); - pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); - pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); - pSqlExpr->uid = htobe64(pExpr->uid); - - pSqlExpr->colType = htons(pExpr->colType); - pSqlExpr->colBytes = htons(pExpr->colBytes); - pSqlExpr->resType = htons(pExpr->resType); - pSqlExpr->resBytes = htons(pExpr->resBytes); - - pSqlExpr->functionId = htons(pExpr->functionId); - pSqlExpr->numOfParams = htons(pExpr->numOfParams); - pSqlExpr->resColId = htons(pExpr->resColId); - pMsg += sizeof(SSqlExpr); - - for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log - pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); - pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); - - if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); - pMsg += pExpr->param[j].nLen; - } else { - pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64); - } - } - - pSqlExpr = (SSqlExpr *)pMsg; - } - } - - // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); - - SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; - if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) { - pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); - pQueryMsg->orderType = htons(pGroupbyExpr->orderType); - - for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { - SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); - - *((int16_t *)pMsg) = htons(pCol->colId); - pMsg += sizeof(pCol->colId); - - *((int16_t *)pMsg) += htons(pCol->colIndex); - pMsg += sizeof(pCol->colIndex); - - *((int16_t *)pMsg) += htons(pCol->flag); - pMsg += sizeof(pCol->flag); - - memcpy(pMsg, pCol->name, tListLen(pCol->name)); - pMsg += tListLen(pCol->name); - } - } - - if (query.fillType != TSDB_FILL_NONE) { - for (int32_t i = 0; i < query.numOfOutput; ++i) { - *((int64_t *)pMsg) = htobe64(query.fillVal[i]); - pMsg += sizeof(query.fillVal[0]); - } - } - - if (query.numOfTags > 0) { - for (int32_t i = 0; i < query.numOfTags; ++i) { - SColumnInfo* pTag = &query.tagColList[i]; - - SColumnInfo* pTagCol = (SColumnInfo*) pMsg; - pTagCol->colId = htons(pTag->colId); - pTagCol->bytes = htons(pTag->bytes); - pTagCol->type = htons(pTag->type); - pTagCol->numOfFilters = 0; - - pMsg += sizeof(SColumnInfo); - } - } - - // serialize tag column query condition - if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { - STagCond* pTagCond = &pQueryInfo->tagCond; - - SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); - if (pCond != NULL && pCond->cond != NULL) { - pQueryMsg->tagCondLen = htons(pCond->len); - memcpy(pMsg, pCond->cond, pCond->len); - - pMsg += pCond->len; - } - } - - if (pQueryInfo->bufLen > 0) { - memcpy(pMsg, pQueryInfo->buf, pQueryInfo->bufLen); - pMsg += pQueryInfo->bufLen; - } - - SCond* pCond = &pQueryInfo->tagCond.tbnameCond; - if (pCond->len > 0) { - strncpy(pMsg, pCond->cond, pCond->len); - pMsg += pCond->len; - } - - // compressed ts block - pQueryMsg->tsBuf.tsOffset = htonl((int32_t)(pMsg - pCmd->payload)); - - if (pQueryInfo->tsBuf != NULL) { - // note: here used the index instead of actual vnode id. - int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; - int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - pMsg += pQueryMsg->tsBuf.tsLen; - - pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); - pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); - pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); - } - - int32_t numOfOperator = taosArrayGetSize(queryOperator); - pQueryMsg->numOfOperator = htonl(numOfOperator); - for(int32_t i = 0; i < numOfOperator; ++i) { - int32_t *operator = taosArrayGet(queryOperator, i); - *(int32_t*)pMsg = htonl(*operator); - - pMsg += sizeof(int32_t); - } - - memcpy(pMsg, pSql->sqlstr, sqlLen); - pMsg += sqlLen; - - int32_t msgLen = (int32_t)(pMsg - pCmd->payload); - - tscDebug("%p msg built success, len:%d bytes", pSql, msgLen); - pCmd->payloadLen = msgLen; - pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; - - pQueryMsg->head.contLen = htonl(msgLen); - assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); - - return TSDB_CODE_SUCCESS; - } -/* SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); - int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); + int32_t numOfTags = query.numOfTags; int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); - if (pQueryInfo->order.order == TSDB_ORDER_ASC) { - pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); - pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); + if (taosArrayGetSize(tableScanOperator) == 0) { + pQueryMsg->tableScanOperator = htonl(-1); + } else { + int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0); + pQueryMsg->tableScanOperator = htonl(*tablescanOp); + } + + if (query.order.order == TSDB_ORDER_ASC) { + pQueryMsg->window.skey = htobe64(query.window.skey); + pQueryMsg->window.ekey = htobe64(query.window.ekey); } else { - pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey); - pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey); - } - - pQueryMsg->order = htons(pQueryInfo->order.order); - pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); - pQueryMsg->fillType = htons(pQueryInfo->fillType); - pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); - pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); - pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); - pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval); - pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding); - pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset); - pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit; - pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit; - pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; + pQueryMsg->window.skey = htobe64(query.window.ekey); + pQueryMsg->window.ekey = htobe64(query.window.skey); + } + + pQueryMsg->order = htons(query.order.order); + pQueryMsg->orderColId = htons(query.order.orderColId); + pQueryMsg->fillType = htons(query.fillType); + pQueryMsg->limit = htobe64(query.limit.limit); + pQueryMsg->offset = htobe64(query.limit.offset); + pQueryMsg->numOfCols = htons(query.numOfCols); + + pQueryMsg->interval.interval = htobe64(query.interval.interval); + pQueryMsg->interval.sliding = htobe64(query.interval.sliding); + pQueryMsg->interval.offset = htobe64(query.interval.offset); + pQueryMsg->interval.intervalUnit = query.interval.intervalUnit; + pQueryMsg->interval.slidingUnit = query.interval.slidingUnit; + pQueryMsg->interval.offsetUnit = query.interval.offsetUnit; + + pQueryMsg->stableQuery = query.stableQuery; + pQueryMsg->topBotQuery = query.topBotQuery; + pQueryMsg->groupbyColumn = query.groupbyColumn; + pQueryMsg->hasTagResults = query.hasTagResults; + pQueryMsg->timeWindowInterpo = query.timeWindowInterpo; + pQueryMsg->queryBlockDist = query.queryBlockDist; + pQueryMsg->stabledev = query.stabledev; + pQueryMsg->tsCompQuery = query.tsCompQuery; + pQueryMsg->simpleAgg = query.simpleAgg; + pQueryMsg->pointInterpQuery = query.pointInterpQuery; + pQueryMsg->needReverseScan = query.needReverseScan; + + pQueryMsg->numOfTags = htonl(numOfTags); + pQueryMsg->sqlstrLen = htonl(sqlLen); + pQueryMsg->sw.gap = htobe64(query.sw.gap); + pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); + + pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); + pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number + pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len); - pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->queryType = htonl(pQueryInfo->type); - pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); - pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); - pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap); - pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); - - size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); - pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number // set column list ids size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); - SSchema *pSchema = tscGetTableSchema(pTableMeta); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); - SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; - if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || !isValidDataType(pColSchema->type)) { - char n[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, n); - - tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", - pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex, - pColSchema->name); - return TSDB_CODE_TSC_INVALID_SQL; - } + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfo *pCol = &query.colList[i]; - pQueryMsg->colList[i].colId = htons(pCol->info.colId); - pQueryMsg->colList[i].bytes = htons(pCol->info.bytes); - pQueryMsg->colList[i].type = htons(pCol->info.type); - pQueryMsg->colList[i].numOfFilters = htons(pCol->info.numOfFilters); + pQueryMsg->colList[i].colId = htons(pCol->colId); + pQueryMsg->colList[i].bytes = htons(pCol->bytes); + pQueryMsg->colList[i].type = htons(pCol->type); + pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters); // append the filter information after the basic column information - for (int32_t f = 0; f < pCol->info.numOfFilters; ++f) { - SColumnFilterInfo *pColFilter = &pCol->info.filterInfo[f]; + for (int32_t f = 0; f < pCol->numOfFilters; ++f) { + SColumnFilterInfo *pColFilter = &pCol->filterInfo[f]; SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; pFilterMsg->filterstr = htons(pColFilter->filterstr); @@ -1108,158 +862,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; - - for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - - // the queried table has been removed and a new table with the same name has already been created already - // return error msg - if (pExpr->base.uid != pTableMeta->id.uid) { - tscError("%p table has already been destroyed", pSql); - return TSDB_CODE_TSC_INVALID_TABLE_NAME; - } - - if (!tscValidateColumnId(pTableMetaInfo, pExpr->base.colInfo.colId, pExpr->base.numOfParams)) { - tscError("%p table schema is not matched with parsed sql", pSql); - return TSDB_CODE_TSC_INVALID_SQL; - } - - assert(pExpr->base.resColId < 0); - - pSqlExpr->colInfo.colId = htons(pExpr->base.colInfo.colId); - pSqlExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex); - pSqlExpr->colInfo.flag = htons(pExpr->base.colInfo.flag); - - pSqlExpr->colType = htons(pExpr->base.colType); - pSqlExpr->colBytes = htons(pExpr->base.colBytes); - - if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag) || pExpr->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - pSqlExpr->resType = htons(pExpr->base.resType); - pSqlExpr->resBytes = htons(pExpr->base.resBytes); - } else if (pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { - SSchema s = tGetBlockDistColumnSchema(); - - pSqlExpr->resType = htons(s.type); - pSqlExpr->resBytes = htons(s.bytes); - } else { - SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->base.colInfo.colId); - pSqlExpr->resType = htons(s->type); - pSqlExpr->resBytes = htons(s->bytes); - } - - pSqlExpr->functionId = htons(pExpr->base.functionId); - pSqlExpr->numOfParams = htons(pExpr->base.numOfParams); - pSqlExpr->resColId = htons(pExpr->base.resColId); - pMsg += sizeof(SSqlExpr); - - for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { // todo add log - pSqlExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType); - pSqlExpr->param[j].nLen = htons(pExpr->base.param[j].nLen); - - if (pExpr->base.param[j].nType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExpr->base.param[j].pz, pExpr->base.param[j].nLen); - pMsg += pExpr->base.param[j].nLen; - } else { - pSqlExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64); - } + for (int32_t i = 0; i < query.numOfOutput; ++i) { + int32_t code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } + } - pSqlExpr = (SSqlExpr *)pMsg; - } - - size_t output = tscNumOfFields(pQueryInfo); - - if (tscIsSecondStageQuery(pQueryInfo)) { - pQueryMsg->secondStageOutput = htonl((int32_t) output); - - SSqlExpr *pExpr1 = (SSqlExpr *)pMsg; - - for (int32_t i = 0; i < output; ++i) { - SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); - SExprInfo *pExpr = pField->pExpr; - - // this should be switched to projection query - if (pExpr->pExpr == NULL) { - // the queried table has been removed and a new table with the same name has already been created already - // return error msg - if (pExpr->base.uid != pTableMeta->id.uid) { - tscError("%p table has already been destroyed", pSql); - return TSDB_CODE_TSC_INVALID_TABLE_NAME; - } - - if (!tscValidateColumnId(pTableMetaInfo, pExpr->base.colInfo.colId, pExpr->base.numOfParams)) { - tscError("%p table schema is not matched with parsed sql", pSql); - return TSDB_CODE_TSC_INVALID_SQL; - } - - pExpr1->numOfParams = 0; // no params for projection query - pExpr1->functionId = htons(TSDB_FUNC_PRJ); - pExpr1->colInfo.colId = htons(pExpr->base.resColId); - pExpr1->colInfo.flag = htons(TSDB_COL_NORMAL); - pExpr1->colType = htons(pExpr->base.resType); - pExpr1->colBytes = htons(pExpr->base.resBytes); - - bool assign = false; - for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) { - SExprInfo *pe = tscSqlExprGet(pQueryInfo, f); - if (pe == pExpr) { - pExpr1->colInfo.colIndex = htons(f); - pExpr1->resType = htons(pe->base.resType); - pExpr1->resBytes = htons(pe->base.resBytes); - assign = true; - break; - } - } - - assert(assign); - pMsg += sizeof(SSqlExpr); - pExpr1 = (SSqlExpr *)pMsg; - } else { - assert(pField->pExpr != NULL); - SExprInfo* pExprInfo = pField->pExpr; - - pExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId); - pExpr1->colType = htons(pExprInfo->base.colType); - pExpr1->colBytes = htons(pExprInfo->base.colBytes); - pExpr1->resBytes = htons(sizeof(double)); - pExpr1->resType = htons(TSDB_DATA_TYPE_DOUBLE); - - pExpr1->functionId = htons(pExprInfo->base.functionId); - pExpr1->numOfParams = htons(pExprInfo->base.numOfParams); - pMsg += sizeof(SSqlExpr); - - for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { - // todo add log - pExpr1->param[j].nType = htons((uint16_t)pExprInfo->base.param[j].nType); - pExpr1->param[j].nLen = htons(pExprInfo->base.param[j].nLen); - - if (pExprInfo->base.param[j].nType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExprInfo->base.param[j].pz, pExprInfo->base.param[j].nLen); - pMsg += pExprInfo->base.param[j].nLen; - } else { - pExpr1->param[j].i64 = htobe64(pExprInfo->base.param[j].i64); - } - } - - pExpr1 = (SSqlExpr *)pMsg; - } + for (int32_t i = 0; i < query.numOfExpr2; ++i) { + int32_t code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - } else { - pQueryMsg->secondStageOutput = 0; } // serialize the table info (sid, uid, tags) pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); - - SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; - if (pGroupbyExpr->numOfGroupCols > 0) { + + SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; + if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) { pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderType = htons(pGroupbyExpr->orderType); for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); - + *((int16_t *)pMsg) = htons(pCol->colId); pMsg += sizeof(pCol->colId); @@ -1268,48 +895,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { *((int16_t *)pMsg) += htons(pCol->flag); pMsg += sizeof(pCol->flag); - + memcpy(pMsg, pCol->name, tListLen(pCol->name)); pMsg += tListLen(pCol->name); } } - if (pQueryInfo->fillType != TSDB_FILL_NONE) { - for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { - *((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]); - pMsg += sizeof(pQueryInfo->fillVal[0]); + if (query.fillType != TSDB_FILL_NONE) { + for (int32_t i = 0; i < query.numOfOutput; ++i) { + *((int64_t *)pMsg) = htobe64(query.fillVal[i]); + pMsg += sizeof(query.fillVal[0]); } } - - if (numOfTags != 0) { - int32_t numOfColumns = tscGetNumOfColumns(pTableMeta); - int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); - int32_t total = numOfTagColumns + numOfColumns; - - pSchema = tscGetTableTagSchema(pTableMeta); - - for (int32_t i = 0; i < numOfTags; ++i) { - SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); - SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; - if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || - (!isValidDataType(pColSchema->type))) { - char n[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, n); + if (query.numOfTags > 0) { + for (int32_t i = 0; i < query.numOfTags; ++i) { + SColumnInfo* pTag = &query.tagColList[i]; - tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", - pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name); - - return TSDB_CODE_TSC_INVALID_SQL; - } - SColumnInfo* pTagCol = (SColumnInfo*) pMsg; - - pTagCol->colId = htons(pColSchema->colId); - pTagCol->bytes = htons(pColSchema->bytes); - pTagCol->type = htons(pColSchema->type); + pTagCol->colId = htons(pTag->colId); + pTagCol->bytes = htons(pTag->bytes); + pTagCol->type = htons(pTag->type); pTagCol->numOfFilters = 0; - + pMsg += sizeof(SColumnInfo); } } @@ -1317,12 +925,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // serialize tag column query condition if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { STagCond* pTagCond = &pQueryInfo->tagCond; - + SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); if (pCond != NULL && pCond->cond != NULL) { pQueryMsg->tagCondLen = htons(pCond->len); memcpy(pMsg, pCond->cond, pCond->len); - + pMsg += pCond->len; } } @@ -1356,6 +964,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); } + int32_t numOfOperator = taosArrayGetSize(queryOperator); + pQueryMsg->numOfOperator = htonl(numOfOperator); + for(int32_t i = 0; i < numOfOperator; ++i) { + int32_t *operator = taosArrayGet(queryOperator, i); + *(int32_t*)pMsg = htonl(*operator); + + pMsg += sizeof(int32_t); + } + memcpy(pMsg, pSql->sqlstr, sqlLen); pMsg += sqlLen; @@ -1364,11 +981,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscDebug("%p msg built success, len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; - + pQueryMsg->head.contLen = htonl(msgLen); assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); - return TSDB_CODE_SUCCESS;*/ + freeQueryAttr(&query); + taosArrayDestroy(tableScanOperator); + taosArrayDestroy(queryOperator); + + return TSDB_CODE_SUCCESS; } int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 02b82cfe01afaaa95a842034425a16b74857b5c5..4e600f42fe8e7efa45f24615fdfd41c62671e873 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2027,6 +2027,9 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { tfree(pQueryInfo->fillVal); tfree(pQueryInfo->buf); + + taosArrayDestroy(pQueryInfo->pUpstream); + taosArrayDestroy(pQueryInfo->pDownstream); } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -3183,7 +3186,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; - pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); + pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); *(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr; pQueryAttr->pExpr1 = calloc(pQueryAttr->numOfOutput, sizeof(SExprInfo)); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 4786ff7cbcb0df6ce7f29d23bfbae677e4cd1659..5bbd40a861d134b65ea10336c7ea764d98498482 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -467,6 +467,7 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); void setQueryKilled(SQInfo *pQInfo); void queryCostStatis(SQInfo *pQInfo); void freeQInfo(SQInfo *pQInfo); +void freeQueryAttr(SQueryAttr *pQuery); int32_t getMaximumIdleDurationSec(); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f9ac48de6eff6692e7b88d504885d11468c07834..7016d5f07fb310d30b7f3ed18c03843f29d71ce2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6486,13 +6486,13 @@ void freeQInfo(SQInfo *pQInfo) { } } + tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo); + doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); tfree(pQInfo->pBuf); tfree(pQInfo->sql); - tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo); - taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); pQInfo->signature = 0; @@ -6639,3 +6639,38 @@ void releaseQueryBuf(size_t numOfTables) { // restore value is not enough buffer available atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); } + +void freeQueryAttr(SQueryAttr* pQueryAttr) { + if (pQueryAttr != NULL) { + if (pQueryAttr->fillVal != NULL) { + tfree(pQueryAttr->fillVal); + } + + for (int32_t i = 0; i < pQueryAttr->numOfFilterCols; ++i) { + SSingleColumnFilterInfo* pColFilter = &pQueryAttr->pFilterInfo[i]; + if (pColFilter->numOfFilters > 0) { + tfree(pColFilter->pFilters); + } + } + + pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); + + tfree(pQueryAttr->tagColList); + tfree(pQueryAttr->pFilterInfo); + + if (pQueryAttr->colList != NULL) { + for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) { + SColumnInfo* column = pQueryAttr->colList + i; + freeColumnFilterInfo(column->filterInfo, column->numOfFilters); + } + tfree(pQueryAttr->colList); + } + + if (pQueryAttr->pGroupbyExpr != NULL) { + taosArrayDestroy(pQueryAttr->pGroupbyExpr->columnInfo); + tfree(pQueryAttr->pGroupbyExpr); + } + } +} +