提交 a36099d8 编写于 作者: H Haojun Liao

[td-2819] refactor codes and fix memory leak

上级 1f1d4946
...@@ -692,6 +692,54 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -692,6 +692,54 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
return pMsg; return pMsg;
} }
static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, char** pMsg, void* addr) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if (pExpr->uid != pTableMeta->id.uid) {
tscError("%p table has already been destroyed", addr);
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", addr);
return TSDB_CODE_TSC_INVALID_SQL;
}
assert(pExpr->resColId < 0);
SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg);
SColIndex* pIndex = &pSqlExpr->colInfo;
pIndex->colId = htons(pExpr->colInfo.colId);
pIndex->colIndex = htons(pExpr->colInfo.colIndex);
pIndex->flag = htons(pExpr->colInfo.flag);
pSqlExpr->uid = htobe64(pExpr->uid);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
(*pMsg) += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen);
(*pMsg) += pExpr->param[j].nLen;
} else {
pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
}
}
return TSDB_CODE_SUCCESS;
}
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -703,386 +751,92 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -703,386 +751,92 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
SQueryAttr query = {0}; SQueryAttr query = {0};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
SArray* tableScanOperator = createTableScanPlan(&query); SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
{
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = query.numOfTags;
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);
if (taosArrayGetSize(tableScanOperator) == 0) {
pQueryMsg->tableScanOperator = htonl(-1);
} else {
int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0);
pQueryMsg->tableScanOperator = htonl(*tablescanOp);
}
if (query.order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(query.window.skey);
pQueryMsg->window.ekey = htobe64(query.window.ekey);
} else {
pQueryMsg->window.skey = htobe64(query.window.ekey);
pQueryMsg->window.ekey = htobe64(query.window.skey);
}
pQueryMsg->order = htons(query.order.order);
pQueryMsg->orderColId = htons(query.order.orderColId);
pQueryMsg->fillType = htons(query.fillType);
pQueryMsg->limit = htobe64(query.limit.limit);
pQueryMsg->offset = htobe64(query.limit.offset);
pQueryMsg->numOfCols = htons(query.numOfCols);
pQueryMsg->interval.interval = htobe64(query.interval.interval);
pQueryMsg->interval.sliding = htobe64(query.interval.sliding);
pQueryMsg->interval.offset = htobe64(query.interval.offset);
pQueryMsg->interval.intervalUnit = query.interval.intervalUnit;
pQueryMsg->interval.slidingUnit = query.interval.slidingUnit;
pQueryMsg->interval.offsetUnit = query.interval.offsetUnit;
pQueryMsg->stableQuery = query.stableQuery;
pQueryMsg->topBotQuery = query.topBotQuery;
pQueryMsg->groupbyColumn = query.groupbyColumn;
pQueryMsg->hasTagResults = query.hasTagResults;
pQueryMsg->timeWindowInterpo = query.timeWindowInterpo;
pQueryMsg->queryBlockDist = query.queryBlockDist;
pQueryMsg->stabledev = query.stabledev;
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->sw.gap = htobe64(query.sw.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
pQueryMsg->secondStageOutput = htonl(query.numOfExpr2);
pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len);
pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfo *pCol = &query.colList[i];
pQueryMsg->colList[i].colId = htons(pCol->colId);
pQueryMsg->colList[i].bytes = htons(pCol->bytes);
pQueryMsg->colList[i].type = htons(pCol->type);
pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
// append the filter information after the basic column information
for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
pFilterMsg->filterstr = htons(pColFilter->filterstr);
pMsg += sizeof(SColumnFilterInfo);
if (pColFilter->filterstr) {
pFilterMsg->len = htobe64(pColFilter->len);
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
pMsg += (pColFilter->len + 1); // append the additional filter binary info
} else {
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
tscError("invalid filter info");
return TSDB_CODE_TSC_INVALID_SQL;
}
}
}
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < query.numOfOutput; ++i) {
SSqlExpr *pExpr = &query.pExpr1[i].base;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if (pExpr->uid != pTableMeta->id.uid) {
tscError("%p table has already been destroyed", pSql);
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
assert(pExpr->resColId < 0);
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlExpr->uid = htobe64(pExpr->uid);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
}
}
pSqlExpr = (SSqlExpr *)pMsg;
}
if (query.numOfExpr2 > 0) {
for (int32_t i = 0; i < query.numOfExpr2; ++i) {
SSqlExpr *pExpr = &query.pExpr2[i].base;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if (pExpr->uid != pTableMeta->id.uid) {
tscError("%p table has already been destroyed", pSql);
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
assert(pExpr->resColId < 0);
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlExpr->uid = htobe64(pExpr->uid);
pSqlExpr->colType = htons(pExpr->colType);
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
}
}
pSqlExpr = (SSqlExpr *)pMsg;
}
}
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
*((int16_t *)pMsg) = htons(pCol->colId);
pMsg += sizeof(pCol->colId);
*((int16_t *)pMsg) += htons(pCol->colIndex);
pMsg += sizeof(pCol->colIndex);
*((int16_t *)pMsg) += htons(pCol->flag);
pMsg += sizeof(pCol->flag);
memcpy(pMsg, pCol->name, tListLen(pCol->name));
pMsg += tListLen(pCol->name);
}
}
if (query.fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < query.numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(query.fillVal[i]);
pMsg += sizeof(query.fillVal[0]);
}
}
if (query.numOfTags > 0) {
for (int32_t i = 0; i < query.numOfTags; ++i) {
SColumnInfo* pTag = &query.tagColList[i];
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTag->colId);
pTagCol->bytes = htons(pTag->bytes);
pTagCol->type = htons(pTag->type);
pTagCol->numOfFilters = 0;
pMsg += sizeof(SColumnInfo);
}
}
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
if (pQueryInfo->bufLen > 0) {
memcpy(pMsg, pQueryInfo->buf, pQueryInfo->bufLen);
pMsg += pQueryInfo->bufLen;
}
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) {
strncpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
// compressed ts block
pQueryMsg->tsBuf.tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
if (pQueryInfo->tsBuf != NULL) {
// note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pMsg += pQueryMsg->tsBuf.tsLen;
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
}
int32_t numOfOperator = taosArrayGetSize(queryOperator);
pQueryMsg->numOfOperator = htonl(numOfOperator);
for(int32_t i = 0; i < numOfOperator; ++i) {
int32_t *operator = taosArrayGet(queryOperator, i);
*(int32_t*)pMsg = htonl(*operator);
pMsg += sizeof(int32_t);
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
return TSDB_CODE_SUCCESS;
}
/*
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); int32_t numOfTags = query.numOfTags;
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { if (taosArrayGetSize(tableScanOperator) == 0) {
pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); pQueryMsg->tableScanOperator = htonl(-1);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); } else {
int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0);
pQueryMsg->tableScanOperator = htonl(*tablescanOp);
}
if (query.order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(query.window.skey);
pQueryMsg->window.ekey = htobe64(query.window.ekey);
} else { } else {
pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey); pQueryMsg->window.skey = htobe64(query.window.ekey);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey); pQueryMsg->window.ekey = htobe64(query.window.skey);
} }
pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->order = htons(query.order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->orderColId = htons(query.order.orderColId);
pQueryMsg->fillType = htons(pQueryInfo->fillType); pQueryMsg->fillType = htons(query.fillType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->limit = htobe64(query.limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->offset = htobe64(query.limit.offset);
pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->numOfCols = htons(query.numOfCols);
pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding); pQueryMsg->interval.interval = htobe64(query.interval.interval);
pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset); pQueryMsg->interval.sliding = htobe64(query.interval.sliding);
pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit; pQueryMsg->interval.offset = htobe64(query.interval.offset);
pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit; pQueryMsg->interval.intervalUnit = query.interval.intervalUnit;
pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; pQueryMsg->interval.slidingUnit = query.interval.slidingUnit;
pQueryMsg->interval.offsetUnit = query.interval.offsetUnit;
pQueryMsg->stableQuery = query.stableQuery;
pQueryMsg->topBotQuery = query.topBotQuery;
pQueryMsg->groupbyColumn = query.groupbyColumn;
pQueryMsg->hasTagResults = query.hasTagResults;
pQueryMsg->timeWindowInterpo = query.timeWindowInterpo;
pQueryMsg->queryBlockDist = query.queryBlockDist;
pQueryMsg->stabledev = query.stabledev;
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->sw.gap = htobe64(query.sw.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
pQueryMsg->secondStageOutput = htonl(query.numOfExpr2);
pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len); pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
// set column list ids // set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
SSchema *pSchema = tscGetTableSchema(pTableMeta);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || !isValidDataType(pColSchema->type)) { for (int32_t i = 0; i < numOfCols; ++i) {
char n[TSDB_TABLE_FNAME_LEN] = {0}; SColumnInfo *pCol = &query.colList[i];
tNameExtractFullName(&pTableMetaInfo->name, n);
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
}
pQueryMsg->colList[i].colId = htons(pCol->info.colId); pQueryMsg->colList[i].colId = htons(pCol->colId);
pQueryMsg->colList[i].bytes = htons(pCol->info.bytes); pQueryMsg->colList[i].bytes = htons(pCol->bytes);
pQueryMsg->colList[i].type = htons(pCol->info.type); pQueryMsg->colList[i].type = htons(pCol->type);
pQueryMsg->colList[i].numOfFilters = htons(pCol->info.numOfFilters); pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
// append the filter information after the basic column information // append the filter information after the basic column information
for (int32_t f = 0; f < pCol->info.numOfFilters; ++f) { for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pCol->info.filterInfo[f]; SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
pFilterMsg->filterstr = htons(pColFilter->filterstr); pFilterMsg->filterstr = htons(pColFilter->filterstr);
...@@ -1108,158 +862,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1108,158 +862,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} }
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; for (int32_t i = 0; i < query.numOfOutput; ++i) {
int32_t code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql);
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { if (code != TSDB_CODE_SUCCESS) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); return code;
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if (pExpr->base.uid != pTableMeta->id.uid) {
tscError("%p table has already been destroyed", pSql);
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->base.colInfo.colId, pExpr->base.numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
assert(pExpr->base.resColId < 0);
pSqlExpr->colInfo.colId = htons(pExpr->base.colInfo.colId);
pSqlExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex);
pSqlExpr->colInfo.flag = htons(pExpr->base.colInfo.flag);
pSqlExpr->colType = htons(pExpr->base.colType);
pSqlExpr->colBytes = htons(pExpr->base.colBytes);
if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag) || pExpr->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
pSqlExpr->resType = htons(pExpr->base.resType);
pSqlExpr->resBytes = htons(pExpr->base.resBytes);
} else if (pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pSqlExpr->resType = htons(s.type);
pSqlExpr->resBytes = htons(s.bytes);
} else {
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->base.colInfo.colId);
pSqlExpr->resType = htons(s->type);
pSqlExpr->resBytes = htons(s->bytes);
}
pSqlExpr->functionId = htons(pExpr->base.functionId);
pSqlExpr->numOfParams = htons(pExpr->base.numOfParams);
pSqlExpr->resColId = htons(pExpr->base.resColId);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->base.param[j].nLen);
if (pExpr->base.param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->base.param[j].pz, pExpr->base.param[j].nLen);
pMsg += pExpr->base.param[j].nLen;
} else {
pSqlExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64);
}
} }
}
pSqlExpr = (SSqlExpr *)pMsg; for (int32_t i = 0; i < query.numOfExpr2; ++i) {
} int32_t code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql);
if (code != TSDB_CODE_SUCCESS) {
size_t output = tscNumOfFields(pQueryInfo); return code;
if (tscIsSecondStageQuery(pQueryInfo)) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlExpr *pExpr1 = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprInfo *pExpr = pField->pExpr;
// this should be switched to projection query
if (pExpr->pExpr == NULL) {
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if (pExpr->base.uid != pTableMeta->id.uid) {
tscError("%p table has already been destroyed", pSql);
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->base.colInfo.colId, pExpr->base.numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
pExpr1->numOfParams = 0; // no params for projection query
pExpr1->functionId = htons(TSDB_FUNC_PRJ);
pExpr1->colInfo.colId = htons(pExpr->base.resColId);
pExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
pExpr1->colType = htons(pExpr->base.resType);
pExpr1->colBytes = htons(pExpr->base.resBytes);
bool assign = false;
for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
SExprInfo *pe = tscSqlExprGet(pQueryInfo, f);
if (pe == pExpr) {
pExpr1->colInfo.colIndex = htons(f);
pExpr1->resType = htons(pe->base.resType);
pExpr1->resBytes = htons(pe->base.resBytes);
assign = true;
break;
}
}
assert(assign);
pMsg += sizeof(SSqlExpr);
pExpr1 = (SSqlExpr *)pMsg;
} else {
assert(pField->pExpr != NULL);
SExprInfo* pExprInfo = pField->pExpr;
pExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId);
pExpr1->colType = htons(pExprInfo->base.colType);
pExpr1->colBytes = htons(pExprInfo->base.colBytes);
pExpr1->resBytes = htons(sizeof(double));
pExpr1->resType = htons(TSDB_DATA_TYPE_DOUBLE);
pExpr1->functionId = htons(pExprInfo->base.functionId);
pExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
// todo add log
pExpr1->param[j].nType = htons((uint16_t)pExprInfo->base.param[j].nType);
pExpr1->param[j].nLen = htons(pExprInfo->base.param[j].nLen);
if (pExprInfo->base.param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExprInfo->base.param[j].pz, pExprInfo->base.param[j].nLen);
pMsg += pExprInfo->base.param[j].nLen;
} else {
pExpr1->param[j].i64 = htobe64(pExprInfo->base.param[j].i64);
}
}
pExpr1 = (SSqlExpr *)pMsg;
}
} }
} else {
pQueryMsg->secondStageOutput = 0;
} }
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr->numOfGroupCols > 0) { if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType); pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
*((int16_t *)pMsg) = htons(pCol->colId); *((int16_t *)pMsg) = htons(pCol->colId);
pMsg += sizeof(pCol->colId); pMsg += sizeof(pCol->colId);
...@@ -1268,48 +895,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1268,48 +895,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*((int16_t *)pMsg) += htons(pCol->flag); *((int16_t *)pMsg) += htons(pCol->flag);
pMsg += sizeof(pCol->flag); pMsg += sizeof(pCol->flag);
memcpy(pMsg, pCol->name, tListLen(pCol->name)); memcpy(pMsg, pCol->name, tListLen(pCol->name));
pMsg += tListLen(pCol->name); pMsg += tListLen(pCol->name);
} }
} }
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (query.fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { for (int32_t i = 0; i < query.numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(pQueryInfo->fillVal[i]); *((int64_t *)pMsg) = htobe64(query.fillVal[i]);
pMsg += sizeof(pQueryInfo->fillVal[0]); pMsg += sizeof(query.fillVal[0]);
} }
} }
if (numOfTags != 0) {
int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
int32_t total = numOfTagColumns + numOfColumns;
pSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t i = 0; i < numOfTags; ++i) {
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || if (query.numOfTags > 0) {
(!isValidDataType(pColSchema->type))) { for (int32_t i = 0; i < query.numOfTags; ++i) {
char n[TSDB_TABLE_FNAME_LEN] = {0}; SColumnInfo* pTag = &query.tagColList[i];
tNameExtractFullName(&pTableMetaInfo->name, n);
tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
}
SColumnInfo* pTagCol = (SColumnInfo*) pMsg; SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTag->colId);
pTagCol->colId = htons(pColSchema->colId); pTagCol->bytes = htons(pTag->bytes);
pTagCol->bytes = htons(pColSchema->bytes); pTagCol->type = htons(pTag->type);
pTagCol->type = htons(pColSchema->type);
pTagCol->numOfFilters = 0; pTagCol->numOfFilters = 0;
pMsg += sizeof(SColumnInfo); pMsg += sizeof(SColumnInfo);
} }
} }
...@@ -1317,12 +925,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1317,12 +925,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// serialize tag column query condition // serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond; STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) { if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len); pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len); memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len; pMsg += pCond->len;
} }
} }
...@@ -1356,6 +964,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1356,6 +964,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
} }
int32_t numOfOperator = taosArrayGetSize(queryOperator);
pQueryMsg->numOfOperator = htonl(numOfOperator);
for(int32_t i = 0; i < numOfOperator; ++i) {
int32_t *operator = taosArrayGet(queryOperator, i);
*(int32_t*)pMsg = htonl(*operator);
pMsg += sizeof(int32_t);
}
memcpy(pMsg, pSql->sqlstr, sqlLen); memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen; pMsg += sqlLen;
...@@ -1364,11 +981,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1364,11 +981,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscDebug("%p msg built success, len:%d bytes", pSql, msgLen); tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->head.contLen = htonl(msgLen); pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
return TSDB_CODE_SUCCESS;*/ freeQueryAttr(&query);
taosArrayDestroy(tableScanOperator);
taosArrayDestroy(queryOperator);
return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
......
...@@ -2027,6 +2027,9 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { ...@@ -2027,6 +2027,9 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tfree(pQueryInfo->fillVal); tfree(pQueryInfo->fillVal);
tfree(pQueryInfo->buf); tfree(pQueryInfo->buf);
taosArrayDestroy(pQueryInfo->pUpstream);
taosArrayDestroy(pQueryInfo->pDownstream);
} }
void tscClearSubqueryInfo(SSqlCmd* pCmd) { void tscClearSubqueryInfo(SSqlCmd* pCmd) {
...@@ -3183,7 +3186,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -3183,7 +3186,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr));
*(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr; *(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr;
pQueryAttr->pExpr1 = calloc(pQueryAttr->numOfOutput, sizeof(SExprInfo)); pQueryAttr->pExpr1 = calloc(pQueryAttr->numOfOutput, sizeof(SExprInfo));
......
...@@ -467,6 +467,7 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); ...@@ -467,6 +467,7 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo); void setQueryKilled(SQInfo *pQInfo);
void queryCostStatis(SQInfo *pQInfo); void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo); void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
......
...@@ -6486,13 +6486,13 @@ void freeQInfo(SQInfo *pQInfo) { ...@@ -6486,13 +6486,13 @@ void freeQInfo(SQInfo *pQInfo) {
} }
} }
tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
tfree(pQInfo->sql); tfree(pQInfo->sql);
tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0; pQInfo->signature = 0;
...@@ -6639,3 +6639,38 @@ void releaseQueryBuf(size_t numOfTables) { ...@@ -6639,3 +6639,38 @@ void releaseQueryBuf(size_t numOfTables) {
// restore value is not enough buffer available // restore value is not enough buffer available
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
} }
void freeQueryAttr(SQueryAttr* pQueryAttr) {
if (pQueryAttr != NULL) {
if (pQueryAttr->fillVal != NULL) {
tfree(pQueryAttr->fillVal);
}
for (int32_t i = 0; i < pQueryAttr->numOfFilterCols; ++i) {
SSingleColumnFilterInfo* pColFilter = &pQueryAttr->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) {
tfree(pColFilter->pFilters);
}
}
pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
tfree(pQueryAttr->tagColList);
tfree(pQueryAttr->pFilterInfo);
if (pQueryAttr->colList != NULL) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) {
SColumnInfo* column = pQueryAttr->colList + i;
freeColumnFilterInfo(column->filterInfo, column->numOfFilters);
}
tfree(pQueryAttr->colList);
}
if (pQueryAttr->pGroupbyExpr != NULL) {
taosArrayDestroy(pQueryAttr->pGroupbyExpr->columnInfo);
tfree(pQueryAttr->pGroupbyExpr);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册