未验证 提交 ecad71a1 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1708 from taosdata/feature/query

Feature/query
...@@ -113,7 +113,8 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); ...@@ -113,7 +113,8 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag); SSchema* pColSchema, int16_t isTag);
void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex); //void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index);
int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql); int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo); void tscClearInterpInfo(SQueryInfo* pQueryInfo);
...@@ -148,10 +149,10 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, ...@@ -148,10 +149,10 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes,
int32_t tscGetResRowLength(SArray* pExprList); int32_t tscGetResRowLength(SArray* pExprList);
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize); int16_t size, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize); int16_t size, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
...@@ -198,7 +199,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer ...@@ -198,7 +199,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags); SVgroupsInfo* vgroupList, SArray* pTagCols);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
......
...@@ -78,8 +78,7 @@ typedef struct STableMetaInfo { ...@@ -78,8 +78,7 @@ typedef struct STableMetaInfo {
*/ */
int32_t vgroupIndex; int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name char name[TSDB_TABLE_ID_LEN]; // (super) table name
int16_t numOfTags; // total required tags in query, including groupby tags SArray* tagColList; // involved tag columns
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
} STableMetaInfo; } STableMetaInfo;
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
......
...@@ -247,7 +247,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -247,7 +247,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
strncpy(f.name, "Field", TSDB_COL_NAME_LEN); strncpy(f.name, "Field", TSDB_COL_NAME_LEN);
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN, TSDB_COL_NAME_LEN); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN,
TSDB_COL_NAME_LEN, false);
rowLen += TSDB_COL_NAME_LEN; rowLen += TSDB_COL_NAME_LEN;
...@@ -256,7 +257,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -256,7 +257,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
strncpy(f.name, "Type", TSDB_COL_NAME_LEN); strncpy(f.name, "Type", TSDB_COL_NAME_LEN);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength, typeColLength); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength,
typeColLength, false);
rowLen += typeColLength; rowLen += typeColLength;
...@@ -265,7 +267,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -265,7 +267,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
strncpy(f.name, "Length", TSDB_COL_NAME_LEN); strncpy(f.name, "Length", TSDB_COL_NAME_LEN);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t)); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
sizeof(int32_t), false);
rowLen += sizeof(int32_t); rowLen += sizeof(int32_t);
...@@ -274,7 +277,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -274,7 +277,8 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
strncpy(f.name, "Note", TSDB_COL_NAME_LEN); strncpy(f.name, "Note", TSDB_COL_NAME_LEN);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength, noteColLength); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength,
noteColLength, false);
rowLen += noteColLength; rowLen += noteColLength;
return rowLen; return rowLen;
......
此差异已折叠。
...@@ -382,6 +382,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -382,6 +382,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
int doProcessSql(SSqlObj *pSql) { int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS;
if (pCmd->command == TSDB_SQL_SELECT || if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_FETCH ||
...@@ -391,10 +392,15 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -391,10 +392,15 @@ int doProcessSql(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_STABLEVGROUP) { pCmd->command == TSDB_SQL_STABLEVGROUP) {
tscBuildMsg[pCmd->command](pSql, NULL); pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
} }
int32_t code = tscSendMsgToServer(pSql); if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return pRes->code;
}
code = tscSendMsgToServer(pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRes->code = code; pRes->code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
...@@ -640,6 +646,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -640,6 +646,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t numOfTables = 0; int32_t numOfTables = 0;
int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1; numOfTables = 1;
...@@ -648,7 +655,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -648,7 +655,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table } else { // query super table
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
if (index < 0) { if (index < 0) {
tscError("%p error vgroupIndex:%d", pSql, index); tscError("%p error vgroupIndex:%d", pSql, index);
return -1; return -1;
...@@ -689,6 +695,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -689,6 +695,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htons(pQueryInfo->type); pQueryMsg->queryType = htons(pQueryInfo->type);
...@@ -714,7 +721,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -714,7 +721,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
pColSchema->name); pColSchema->name);
return -1; // 0 means build msg failed return TSDB_CODE_INVALID_SQL;
} }
pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].colId = htons(pColSchema->colId);
...@@ -761,7 +768,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -761,7 +768,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) { if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
/* column id is not valid according to the cached metermeta, the table meta is expired */ /* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql); tscError("%p table schema is not matched with parsed sql", pSql);
return -1; return -1;
} }
...@@ -817,7 +824,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -817,7 +824,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->orderType = htons(pGroupbyExpr->orderType); pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex *pCol = &pGroupbyExpr->columnInfo[j]; SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
*((int16_t *)pMsg) = pCol->colId; *((int16_t *)pMsg) = pCol->colId;
pMsg += sizeof(pCol->colId); pMsg += sizeof(pCol->colId);
...@@ -840,6 +847,37 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -840,6 +847,37 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} }
if (numOfTags != 0) {
int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
int32_t total = numOfTagColumns + numOfColumns;
pSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t i = 0; i < numOfTags; ++i) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex, pColSchema->name);
return TSDB_CODE_INVALID_SQL;
}
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pColSchema->colId);
pTagCol->bytes = htons(pColSchema->bytes);
pTagCol->type = htons(pColSchema->type);
pTagCol->numOfFilters = 0;
pMsg += sizeof(SColumnInfo);
}
}
// compressed ts block // compressed ts block
pQueryMsg->tsOffset = htonl(pMsg - pStart); pQueryMsg->tsOffset = htonl(pMsg - pStart);
int32_t tsLen = 0; int32_t tsLen = 0;
...@@ -2195,7 +2233,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2195,7 +2233,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes); pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false);
} }
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput; pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
...@@ -2538,7 +2576,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2538,7 +2576,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo *pMMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name); STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
tscAddTableMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex); tscAddTableMetaInfo(pNewQueryInfo, pMMInfo->name, pTableMeta, NULL, pMMInfo->tagColList);
} }
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......
...@@ -947,7 +947,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -947,7 +947,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
} }
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize) { int16_t size, int16_t interSize, bool isTagCol) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
...@@ -957,23 +957,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol ...@@ -957,23 +957,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
// set the correct column index // set the correct column index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
} else {
if (isTagCol) {
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId;
strncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, TSDB_COL_NAME_LEN);
} else { } else {
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex);
pExpr->colInfo.colId = pSchema->colId; pExpr->colInfo.colId = pSchema->colId;
} strncpy(pExpr->colInfo.name, pSchema->name, TSDB_COL_NAME_LEN);
// tag columns require the column index revised.
int16_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (pColIndex->columnIndex >= numOfCols) {
pExpr->colInfo.flag = TSDB_COL_TAG;
} else {
if (pColIndex->columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.flag = TSDB_COL_NORMAL;
} else {
pExpr->colInfo.flag = TSDB_COL_TAG;
} }
} }
pExpr->colInfo.flag = isTagCol? TSDB_COL_TAG:TSDB_COL_NORMAL;
pExpr->colInfo.colIndex = pColIndex->columnIndex; pExpr->colInfo.colIndex = pColIndex->columnIndex;
pExpr->resType = type; pExpr->resType = type;
pExpr->resBytes = size; pExpr->resBytes = size;
...@@ -984,20 +981,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol ...@@ -984,20 +981,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
} }
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize) { int16_t size, int16_t interSize, bool isTagCol) {
int32_t num = taosArrayGetSize(pQueryInfo->exprsInfo); int32_t num = taosArrayGetSize(pQueryInfo->exprsInfo);
if (index == num) { if (index == num) {
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize); return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
} }
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize); SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
taosArrayInsert(pQueryInfo->exprsInfo, index, &pExpr); taosArrayInsert(pQueryInfo->exprsInfo, index, &pExpr);
return pExpr; return pExpr;
} }
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize) { int16_t size, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize); SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprsInfo, &pExpr); taosArrayPush(pQueryInfo->exprsInfo, &pExpr);
return pExpr; return pExpr;
} }
...@@ -1431,9 +1428,9 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { ...@@ -1431,9 +1428,9 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
int16_t actualTagIndex = pTableMetaInfo->tagColumnIndex[pExpr->colInfo.colIndex];
pColInfo[i].type = (actualTagIndex != -1) ? pTagSchema[actualTagIndex].type : TSDB_DATA_TYPE_BINARY; int16_t index = pExpr->colInfo.colIndex;
pColInfo[i].type = (index != -1) ? pTagSchema[index].type : TSDB_DATA_TYPE_BINARY;
} else { } else {
pColInfo[i].type = pSchema[pExpr->colInfo.colIndex].type; pColInfo[i].type = pSchema[pExpr->colInfo.colIndex].type;
} }
...@@ -1645,7 +1642,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1645,7 +1642,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
} }
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags) { SVgroupsInfo* vgroupList, SArray* pTagCols) {
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) { if (pAlloc == NULL) {
return NULL; return NULL;
...@@ -1663,7 +1660,6 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1663,7 +1660,6 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
} }
pTableMetaInfo->pTableMeta = pTableMeta; pTableMetaInfo->pTableMeta = pTableMeta;
pTableMetaInfo->numOfTags = numOfTags;
if (vgroupList != NULL) { if (vgroupList != NULL) {
assert(vgroupList->numOfVgroups == 1); // todo fix me assert(vgroupList->numOfVgroups == 1); // todo fix me
...@@ -1674,8 +1670,10 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1674,8 +1670,10 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
memcpy(pTableMetaInfo->vgroupList, vgroupList, size); memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
} }
if (tags != NULL) { if (pTagCols == NULL) {
memcpy(pTableMetaInfo->tagColumnIndex, tags, sizeof(pTableMetaInfo->tagColumnIndex[0]) * numOfTags); pTableMetaInfo->tagColList = taosArrayInit(4, sizeof(SColumnIndex));
} else {
pTableMetaInfo->tagColList = taosArrayClone(pTagCols);
} }
pQueryInfo->numOfTables += 1; pQueryInfo->numOfTables += 1;
...@@ -1683,7 +1681,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1683,7 +1681,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
} }
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) { STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, 0, NULL); return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
} }
void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
...@@ -1857,17 +1855,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1857,17 +1855,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags, pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
pTableMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pTableMeta to the newly create sql object. } else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pPrevInfo->vgroupList = NULL; pPrevInfo->vgroupList = NULL;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
} }
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1); assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
......
...@@ -479,6 +479,7 @@ typedef struct { ...@@ -479,6 +479,7 @@ typedef struct {
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order int32_t tsOrder; // ts comp block order
int32_t numOfTags; // number of tags columns involved
SColumnInfo colList[]; SColumnInfo colList[];
} SQueryTableMsg; } SQueryTableMsg;
...@@ -514,8 +515,8 @@ typedef struct { ...@@ -514,8 +515,8 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
char acct[TSDB_USER_LEN + 1]; char acct[TSDB_USER_LEN];
char db[TSDB_DB_NAME_LEN + 1]; char db[TSDB_DB_NAME_LEN];
uint32_t vgId; uint32_t vgId;
int32_t maxSessions; int32_t maxSessions;
int32_t cacheBlockSize; int32_t cacheBlockSize;
...@@ -536,8 +537,8 @@ typedef struct { ...@@ -536,8 +537,8 @@ typedef struct {
int8_t repStrategy; int8_t repStrategy;
int8_t loadLatest; // load into mem or not int8_t loadLatest; // load into mem or not
uint8_t precision; // time resolution uint8_t precision; // time resolution
int8_t reserved[16]; int8_t ignoreExist;
} SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg; } SCMCreateDbMsg, SCMAlterDbMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN + 1]; char db[TSDB_TABLE_ID_LEN + 1];
......
...@@ -98,6 +98,7 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); ...@@ -98,6 +98,7 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId id, int32_t col, int16_t *type, int16_t *bytes, char **val); int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId id, int32_t col, int16_t *type, int16_t *bytes, char **val);
int32_t tsdbTableGetName(TsdbRepoT *repo, STableId id, char** name);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
......
...@@ -29,6 +29,32 @@ struct SAcctObj; ...@@ -29,6 +29,32 @@ struct SAcctObj;
struct SUserObj; struct SUserObj;
struct SMnodeObj; struct SMnodeObj;
typedef struct {
char acct[TSDB_USER_LEN];
char db[TSDB_DB_NAME_LEN];
uint32_t vgId;
int32_t maxSessions;
int32_t cacheBlockSize;
union {
int32_t totalBlocks;
float fraction;
} cacheNumOfBlocks;
int32_t daysPerFile;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t daysToKeep;
int32_t commitTime;
int32_t rowsInFileBlock;
int16_t blocksPerTable;
int8_t compression;
int8_t commitLog;
int8_t replications;
int8_t repStrategy;
int8_t loadLatest; // load into mem or not
uint8_t precision; // time resolution
int8_t reserved[16];
} SDbCfg;
typedef struct SDnodeObj { typedef struct SDnodeObj {
int32_t dnodeId; int32_t dnodeId;
uint32_t privateIp; uint32_t privateIp;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h" #include "tutil.h"
#include "tgrant.h" #include "tgrant.h"
...@@ -300,8 +301,13 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { ...@@ -300,8 +301,13 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
SDbObj *pDb = mgmtGetDb(pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb != NULL) { if (pDb != NULL) {
mgmtDecDbRef(pDb); mgmtDecDbRef(pDb);
if (pCreate->ignoreExist) {
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_DB_ALREADY_EXIST; return TSDB_CODE_DB_ALREADY_EXIST;
} }
}
code = mgmtCheckDbParams(pCreate); code = mgmtCheckDbParams(pCreate);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
...@@ -313,18 +319,41 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { ...@@ -313,18 +319,41 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
return code; return code;
} }
pDb = malloc(sizeof(SDbObj)); pDb = calloc(1, sizeof(SDbObj));
memset(pDb, 0, sizeof(SDbObj));
strcpy(pDb->name, pCreate->db); strncpy(pDb->name, pCreate->db, TSDB_DB_NAME_LEN);
strcpy(pCreate->acct, pAcct->user); strncpy(pCreate->acct, pAcct->user, TSDB_USER_LEN);
pDb->createdTime = taosGetTimestampMs(); pDb->createdTime = taosGetTimestampMs();
pDb->cfg = *pCreate;
pDb->cfg = (SDbCfg) {
.vgId = pCreate->vgId,
.precision = pCreate->precision,
.maxSessions = pCreate->maxSessions,
.cacheNumOfBlocks.totalBlocks = pCreate->cacheNumOfBlocks.totalBlocks,
.rowsInFileBlock = pCreate->rowsInFileBlock,
.commitLog = pCreate->commitLog,
.replications = pCreate->replications,
.daysPerFile = pCreate->daysPerFile,
.cacheBlockSize = pCreate->cacheBlockSize,
.commitTime = pCreate->commitTime,
.blocksPerTable = pCreate->blocksPerTable,
.compression = pCreate->compression,
.daysToKeep = pCreate->daysToKeep,
.daysToKeep1 = pCreate->daysToKeep1,
.daysToKeep2 = pCreate->daysToKeep2,
.loadLatest = pCreate->loadLatest,
.repStrategy = pCreate->repStrategy,
};
strncpy(pDb->cfg.db, pCreate->db, TSDB_DB_NAME_LEN);
strncpy(pDb->cfg.acct, pCreate->acct, TSDB_USER_LEN);
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .table = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.rowSize = sizeof(SDbObj) .rowSize = sizeof(SDbObj),
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&oper);
......
...@@ -114,14 +114,13 @@ typedef struct SCreateDBInfo { ...@@ -114,14 +114,13 @@ typedef struct SCreateDBInfo {
int32_t tablesPerVnode; int32_t tablesPerVnode;
int32_t daysPerFile; int32_t daysPerFile;
int32_t rowPerFileBlock; int32_t rowPerFileBlock;
float numOfAvgCacheBlocks; float numOfAvgCacheBlocks;
int32_t numOfBlocksPerTable; int32_t numOfBlocksPerTable;
int64_t commitTime; int64_t commitTime;
int32_t commitLog; int32_t commitLog;
int32_t compressionLevel; int32_t compressionLevel;
SSQLToken precision; SSQLToken precision;
bool ignoreExists;
tVariantList *keep; tVariantList *keep;
} SCreateDBInfo; } SCreateDBInfo;
......
...@@ -39,8 +39,8 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -39,8 +39,8 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
typedef struct SSqlGroupbyExpr { typedef struct SSqlGroupbyExpr {
int16_t tableIndex; int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; int16_t numOfGroupCols;
SColIndex* columnInfo; // group by columns information
int16_t orderIndex; // order by column index int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr; } SSqlGroupbyExpr;
...@@ -108,7 +108,7 @@ typedef struct STableQueryInfo { ...@@ -108,7 +108,7 @@ typedef struct STableQueryInfo {
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
} STableQueryInfo; } STableQueryInfo;
typedef struct STableDataInfo { typedef struct STableDataInfo { // todo merge with the STableQueryInfo struct
int32_t tableIndex; int32_t tableIndex;
int32_t groupIdx; // group id in table list int32_t groupIdx; // group id in table list
STableQueryInfo* pTableQInfo; STableQueryInfo* pTableQInfo;
...@@ -116,6 +116,8 @@ typedef struct STableDataInfo { ...@@ -116,6 +116,8 @@ typedef struct STableDataInfo {
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags;
SOrderVal order; SOrderVal order;
STimeWindow window; STimeWindow window;
int64_t intervalTime; int64_t intervalTime;
...@@ -130,6 +132,7 @@ typedef struct SQuery { ...@@ -130,6 +132,7 @@ typedef struct SQuery {
SSqlGroupbyExpr* pGroupbyExpr; SSqlGroupbyExpr* pGroupbyExpr;
SArithExprInfo* pSelectExpr; SArithExprInfo* pSelectExpr;
SColumnInfo* colList; SColumnInfo* colList;
SColumnInfo* tagColList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* defaultVal; int64_t* defaultVal;
TSKEY lastKey; TSKEY lastKey;
......
...@@ -815,8 +815,7 @@ void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBI ...@@ -815,8 +815,7 @@ void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBI
pInfo->pDCLInfo->dbOpt = *pDB; pInfo->pDCLInfo->dbOpt = *pDB;
pInfo->pDCLInfo->dbOpt.dbname = *pToken; pInfo->pDCLInfo->dbOpt.dbname = *pToken;
pInfo->pDCLInfo->dbOpt.ignoreExists = (pIgExists != NULL);
tTokenListAppend(pInfo->pDCLInfo, pIgExists);
} }
void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo) { void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo) {
......
...@@ -111,7 +111,7 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); ...@@ -111,7 +111,7 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size, static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size,
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag); int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery); static bool hasMainOutput(SQuery *pQuery);
static void createTableDataInfo(SQInfo *pQInfo); static void createTableDataInfo(SQInfo *pQInfo);
...@@ -289,7 +289,7 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) { ...@@ -289,7 +289,7 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) {
} }
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = &pGroupbyExpr->columnInfo[i]; SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_NORMAL) { if (pColIndex->flag == TSDB_COL_NORMAL) {
/* /*
* make sure the normal column locates at the second position if tbname exists in group by clause * make sure the normal column locates at the second position if tbname exists in group by clause
...@@ -312,7 +312,7 @@ int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) { ...@@ -312,7 +312,7 @@ int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
int16_t type = TSDB_DATA_TYPE_NULL; int16_t type = TSDB_DATA_TYPE_NULL;
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = &pGroupbyExpr->columnInfo[i]; SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_NORMAL) { if (pColIndex->flag == TSDB_COL_NORMAL) {
colId = pColIndex->colId; colId = pColIndex->colId;
break; break;
...@@ -996,12 +996,13 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1 ...@@ -996,12 +996,13 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
if (pGroupbyExpr->columnInfo[k].flag == TSDB_COL_TAG) { SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (pColIndex->flag == TSDB_COL_TAG) {
continue; continue;
} }
int16_t colIndex = -1; int16_t colIndex = -1;
int32_t colId = pGroupbyExpr->columnInfo[k].colId; int32_t colId = pColIndex->colId;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].colId == colId) { if (pQuery->colList[i].colId == colId) {
...@@ -1378,8 +1379,21 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1378,8 +1379,21 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); SColIndex* pIndex = &pSqlFuncMsg->colInfo;
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) {
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) {
pCtx->inputBytes = TSDB_TABLE_NAME_LEN;
pCtx->inputType = TSDB_DATA_TYPE_BINARY;
} else {
pCtx->inputBytes = pQuery->tagColList[index].bytes;
pCtx->inputType = pQuery->tagColList[index].type;
}
} else {
pCtx->inputBytes = pQuery->colList[index].bytes;
pCtx->inputType = pQuery->colList[index].type;
}
pCtx->ptsOutputBuf = NULL; pCtx->ptsOutputBuf = NULL;
...@@ -2494,8 +2508,19 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar ...@@ -2494,8 +2508,19 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
int16_t bytes = 0; int16_t bytes = 0;
int16_t type = 0; int16_t type = 0;
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
tsdbTableGetName(tsdb, id, &val);
bytes = TSDB_TABLE_NAME_LEN;
type = TSDB_DATA_TYPE_BINARY;
} else {
tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val); tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val);
}
tVariantCreateFromBinary(param, val, bytes, type); tVariantCreateFromBinary(param, val, bytes, type);
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
tfree(val);
}
} }
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
...@@ -3453,7 +3478,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid ...@@ -3453,7 +3478,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid
return pTableQueryInfo; return pTableQueryInfo;
} }
UNUSED_FUNC void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) { void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) {
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return; return;
} }
...@@ -3555,7 +3580,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * ...@@ -3555,7 +3580,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) { int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey > 0); assert(pTableQueryInfo->lastKey >= 0);
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
...@@ -4162,7 +4187,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) ...@@ -4162,7 +4187,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
.numOfCols = pQuery->numOfCols, .numOfCols = pQuery->numOfCols,
}; };
if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo);
}
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
...@@ -4263,9 +4291,8 @@ static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STable ...@@ -4263,9 +4291,8 @@ static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STable
} }
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = &pGroupbyExpr->columnInfo[i]; SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_TAG) { if (pColIndex->flag == TSDB_COL_TAG) {
// assert(pSidset->numOfTables == pSidset->numOfSubSet);
return true; return true;
} }
} }
...@@ -4386,7 +4413,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4386,7 +4413,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
// include only current table // include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vnodeIndex == -1) { if (pRuntimeEnv->cur.vnodeIndex == -1) {
...@@ -5142,23 +5176,34 @@ static void stableQueryImpl(SQInfo *pQInfo) { ...@@ -5142,23 +5176,34 @@ static void stableQueryImpl(SQInfo *pQInfo) {
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
} }
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) { static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
int32_t j = 0; int32_t j = 0;
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
while(j < pQueryMsg->numOfTags) {
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
return j;
}
j += 1;
}
} else {
while (j < pQueryMsg->numOfCols) { while (j < pQueryMsg->numOfCols) {
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) { if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
break; return j;
} }
j += 1; j += 1;
} }
}
return j; assert(0);
} }
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) { bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg); int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg, pTagCols);
return j < pQueryMsg->numOfCols; return j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags;
} }
static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) { static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
...@@ -5228,7 +5273,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p ...@@ -5228,7 +5273,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
* @return * @return
*/ */
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, SColIndex **groupbyCols) { char **tagCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
...@@ -5250,6 +5295,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5250,6 +5295,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->tsLen = htonl(pQueryMsg->tsLen); pQueryMsg->tsLen = htonl(pQueryMsg->tsLen);
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder);
pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags);
// query msg safety check // query msg safety check
if (validateQueryMsg(pQueryMsg) != 0) { if (validateQueryMsg(pQueryMsg) != 0) {
...@@ -5333,9 +5379,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5333,9 +5379,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
return TSDB_CODE_INVALID_QUERY_MSG; return TSDB_CODE_INVALID_QUERY_MSG;
} }
} else { } else {
if (!vnodeValidateExprColumnInfo(pQueryMsg, pExprMsg)) { // if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
return TSDB_CODE_INVALID_QUERY_MSG; // return TSDB_CODE_INVALID_QUERY_MSG;
} // }
} }
pExprMsg = (SSqlFuncMsg *)pMsg; pExprMsg = (SSqlFuncMsg *)pMsg;
...@@ -5387,6 +5433,21 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5387,6 +5433,21 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pQueryMsg->tagCondLen > 0) { if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen); *tagCond = calloc(1, pQueryMsg->tagCondLen);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
if (pQueryMsg->numOfTags > 0) {
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTagCol->colId);
pTagCol->bytes = htons(pTagCol->bytes);
pTagCol->type = htons(pTagCol->type);
pTagCol->numOfFilters = 0;
(*tagCols)[i] = *pTagCol;
}
} }
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64
...@@ -5452,7 +5513,7 @@ static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg ...@@ -5452,7 +5513,7 @@ static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg
} }
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr, static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr,
SSqlFuncMsg **pExprMsg) { SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) {
*pSqlFuncExpr = NULL; *pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -5482,11 +5543,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp ...@@ -5482,11 +5543,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
type = TSDB_DATA_TYPE_DOUBLE; type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize; bytes = tDataTypeDesc[type].nSize;
} else { // parse the normal column } else if (pExprs[i].pBase.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN;
} else{
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols);
assert(j < pQueryMsg->numOfCols); assert(j < pQueryMsg->numOfCols);
SColumnInfo *pCol = &pQueryMsg->colList[j]; SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].pBase.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
type = pCol->type; type = pCol->type;
bytes = pCol->bytes; bytes = pCol->bytes;
} }
...@@ -5510,16 +5574,15 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp ...@@ -5510,16 +5574,15 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i]; pExprs[i].pBase = *pExprMsg[i];
int16_t functId = pExprs[i].pBase.functionId; int16_t functId = pExprs[i].pBase.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols);
assert(j < pQueryMsg->numOfCols); assert(j < pQueryMsg->numOfCols);
SColumnInfo *pCol = &pQueryMsg->colList[j]; SColumnInfo *pCol = &pQueryMsg->colList[j];
int16_t type = pCol->type;
int16_t bytes = pCol->bytes;
int32_t ret = int32_t ret =
getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, pExprs[i].pBase.arg[0].argValue.i64, getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].pBase.arg[0].argValue.i64,
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable); &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS); assert(ret == TSDB_CODE_SUCCESS);
} }
...@@ -5547,7 +5610,11 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol ...@@ -5547,7 +5610,11 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol
pGroupbyExpr->orderType = pQueryMsg->orderType; pGroupbyExpr->orderType = pQueryMsg->orderType;
pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx; pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
pGroupbyExpr->columnInfo = pColIndex; pGroupbyExpr->columnInfo = taosArrayInit(pQueryMsg->numOfGroupCols, sizeof(SColIndex));
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]);
}
return pGroupbyExpr; return pGroupbyExpr;
} }
...@@ -5646,17 +5713,26 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { ...@@ -5646,17 +5713,26 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
} }
SColIndex *pColIndexEx = &pSqlExprMsg->colInfo; SColIndex *pColIndexEx = &pSqlExprMsg->colInfo;
if (!TSDB_COL_IS_TAG(pColIndexEx->flag)) {
for (int32_t f = 0; f < pQuery->numOfCols; ++f) { for (int32_t f = 0; f < pQuery->numOfCols; ++f) {
if (pColIndexEx->colId == pQuery->colList[f].colId) { if (pColIndexEx->colId == pQuery->colList[f].colId) {
pColIndexEx->colIndex = f; pColIndexEx->colIndex = f;
break; break;
} }
} }
} else {
for (int32_t f = 0; f < pQuery->numOfTags; ++f) {
if (pColIndexEx->colId == pQuery->tagColList[f].colId) {
pColIndexEx->colIndex = f;
break;
}
}
}
} }
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs, static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs,
STableGroupInfo *groupInfo) { STableGroupInfo *groupInfo, SColumnInfo* pTagCols) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
return NULL; return NULL;
...@@ -5680,8 +5756,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5680,8 +5756,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQuery->interpoType = pQueryMsg->interpoType; pQuery->interpoType = pQueryMsg->interpoType;
pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->colList = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfCols); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
goto _cleanup; goto _cleanup;
} }
...@@ -5693,6 +5770,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5693,6 +5770,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters); pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
} }
pQuery->tagColList = pTagCols;
// calculate the result row size // calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) { for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].bytes > 0); assert(pExprs[col].bytes > 0);
...@@ -5743,8 +5822,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5743,8 +5822,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->pos = -1; pQuery->pos = -1;
pQuery->window.skey = pQueryMsg->window.skey; pQuery->window = pQueryMsg->window;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey; pQuery->lastKey = pQuery->window.skey;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
...@@ -5855,10 +5933,6 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5855,10 +5933,6 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
} }
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) { if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
...@@ -5876,16 +5950,34 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5876,16 +5950,34 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->defaultVal); tfree(pQuery->defaultVal);
} }
tfree(pQuery->pGroupbyExpr); // todo refactor, extract method to destroytableDataInfo
tfree(pQuery);
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for (int32_t i = 0; i < numOfGroups; ++i) { for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
SPair* pair = taosArrayGet(p, j);
if (pair->sec != NULL) {
destroyTableQueryInfo(((STableDataInfo*)pair->sec)->pTableQInfo, pQuery->numOfOutput);
tfree(pair->sec);
}
}
taosArrayDestroy(p); taosArrayDestroy(p);
} }
if (pQuery->pGroupbyExpr != NULL) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
tfree(pQuery->pGroupbyExpr);
}
tfree(pQuery->tagColList);
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
taosArrayDestroy(pQInfo->groupInfo.pGroupList); taosArrayDestroy(pQInfo->groupInfo.pGroupList);
tfree(pQuery);
qTrace("QInfo:%p QInfo is freed", pQInfo); qTrace("QInfo:%p QInfo is freed", pQInfo);
...@@ -5959,8 +6051,10 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) ...@@ -5959,8 +6051,10 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
SArray * pTableIdList = NULL; SArray * pTableIdList = NULL;
SSqlFuncMsg **pExprMsg = NULL; SSqlFuncMsg **pExprMsg = NULL;
SColIndex * pGroupColIndex = NULL; SColIndex * pGroupColIndex = NULL;
SColumnInfo* pTagColumnInfo = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) { if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex, &pTagColumnInfo)) !=
TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -5977,7 +6071,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) ...@@ -5977,7 +6071,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
} }
SArithExprInfo *pExprs = NULL; SArithExprInfo *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) { if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over; goto _query_over;
} }
...@@ -6010,7 +6104,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) ...@@ -6010,7 +6104,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
} }
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo); (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
if ((*pQInfo) == NULL) { if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
......
...@@ -251,6 +251,18 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t* ...@@ -251,6 +251,18 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t*
return 0; return 0;
} }
int32_t tsdbTableGetName(TsdbRepoT *repo, STableId id, char** name) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id.uid);
*name = strndup(pTable->name, TSDB_TABLE_NAME_LEN);
if (*name == NULL) {
return -1;
} else {
return 0;
}
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1; if (tsdbCheckTableCfg(pCfg) < 0) return -1;
......
...@@ -1133,6 +1133,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) { ...@@ -1133,6 +1133,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
taosArrayPush(list, &t); taosArrayPush(list, &t);
} }
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1445,6 +1446,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond, ...@@ -1445,6 +1446,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond,
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret; return ret;
} }
...@@ -1463,6 +1465,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond, ...@@ -1463,6 +1465,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond,
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret; return ret;
} }
...@@ -1483,6 +1486,7 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, int64_t uid, STableGroupInfo* pGro ...@@ -1483,6 +1486,7 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, int64_t uid, STableGroupInfo* pGro
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
...@@ -1516,5 +1520,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -1516,5 +1520,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tsdbDestroyHelper(&pQueryHandle->rhelper); tsdbDestroyHelper(&pQueryHandle->rhelper);
tfree(pQueryHandle); tfree(pQueryHandle);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册