提交 cbe666b2 编写于 作者: H hjxilinx

[TD-32] add query io interface

上级 f9241ef1
......@@ -29,10 +29,10 @@ extern "C" {
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......@@ -535,7 +535,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
pRes->code = code;
......@@ -325,7 +325,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pNewQueryInfo->limit = pSupporter->limit;
// fetch the join tag column
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
......@@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
int32_t numOfRows = tscGetNumOfColumns(pMeta);
int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta);
numOfRows = numOfRows + tscGetNumOfTags(pMeta);
......@@ -154,7 +154,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
return 0;
......@@ -785,7 +785,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return code;
if (!UTIL_METER_IS_SUPERTABLE(pSTableMeterMetaInfo)) {
if (!UTIL_TABLE_IS_SUPERTABLE(pSTableMeterMetaInfo)) {
return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
......@@ -1081,7 +1081,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error_clean;
......@@ -1382,7 +1382,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
numOfTotalColumns = tinfo.numOfColumns + tinfo.numOfTags;
} else {
numOfTotalColumns = tinfo.numOfColumns;
......@@ -1444,7 +1444,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -2251,7 +2251,7 @@ bool validateIpAddress(const char* ip, size_t size) {
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pTableMeta == NULL || !UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
......@@ -2289,7 +2289,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
/* transfer the field-info back to original input format */
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
......@@ -2509,7 +2509,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
if (groupTag) {
if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg9);
......@@ -3218,7 +3218,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
// table to table/ super table to super table are allowed
invalidSqlErrMsg(pQueryInfo->msg, msg5);
return false;
......@@ -3301,7 +3301,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
} else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags
// check for tag query condition
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -3659,7 +3659,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns
// must be present for join
if (pCondExpr->pJoinExpr == NULL) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -3697,7 +3697,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
SColumnIndex index = {0};
getColumnIndexByName(&pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index);
......@@ -4045,7 +4045,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
/* for metric query, set default ascending order for group output */
pQueryInfo->groupbyExpr.orderType = TSQL_SO_ASC;
......@@ -4071,7 +4071,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
* for super table query, the order option must be less than 3.
if (pSortorder->nExpr > 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
......@@ -4092,7 +4092,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
SColumnIndex index = {0};
if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { // metric query
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // metric query
if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -4228,13 +4228,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo))) {
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo))) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) &&
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
......@@ -4627,7 +4627,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
bool queryOnTags = false;
if (tscQueryOnlyMetricTags(pQueryInfo, &queryOnTags) != TSDB_CODE_SUCCESS) {
......@@ -5539,7 +5539,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return code;
bool isSTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo);
bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
......@@ -5687,7 +5687,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
bool isSTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo);
bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
......@@ -156,17 +156,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->tableInfo = (STableComInfo){.numOfTags = pTableMetaMsg->numOfTags, .numOfColumns = pTableMetaMsg->numOfColumns,
.precision = pTableMetaMsg->precision};
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.numOfColumns = pTableMetaMsg->numOfColumns,
.precision = pTableMetaMsg->precision
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgid = pTableMetaMsg->vgid;
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
// pTableMeta->tableId = pTableMetaMsg->tableId;
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
......@@ -22,7 +22,6 @@
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
......@@ -542,7 +541,7 @@ int tscProcessSql(SSqlObj *pSql) {
// temp
pSql->ipList = &tscMgmtIpList;
// if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
// pSql->index = pTableMetaInfo->pTableMeta->index;
// } else { // it must be the parent SSqlObj for super table query
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
......@@ -1277,7 +1276,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
// char * pStart = buf + tsRpcHeadSize;
// SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
// if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { // pColumnModel == NULL, query on meter
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pColumnModel == NULL, query on meter
// STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
// pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
// } else { // query on metric
......@@ -1301,7 +1300,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// meter query without tags values
if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
......@@ -1326,7 +1325,7 @@ static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vn
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
......@@ -1373,7 +1372,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
char * pStart = pCmd->payload + tsRpcHeadSize;
char *pStart = pCmd->payload + tsRpcHeadSize;
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
......@@ -1383,15 +1382,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t msgLen = 0;
int32_t numOfTables = 0;
numOfTables = 1;
// tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql,
// pTableMeta->vpeerDesc[pTableMeta->index].vnode, 1, pTableMetaInfo->name);
// pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
......@@ -1407,19 +1404,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1; // error
tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfTables);
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vnode = htons(vnodeId);
pQueryMsg->numOfSids = htonl(numOfTables);
pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags);
if (pQueryInfo->order.order == TSQL_SO_ASC) {
pQueryMsg->skey = htobe64(pQueryInfo->stime);
pQueryMsg->ekey = htobe64(pQueryInfo->etime);
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
} else {
pQueryMsg->skey = htobe64(pQueryInfo->etime);
pQueryMsg->ekey = htobe64(pQueryInfo->stime);
pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
pQueryMsg->order = htons(pQueryInfo->order.order);
......@@ -1453,9 +1450,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { // query on meter
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter
pQueryMsg->tagLength = 0;
} else { // query on metric
} else { // query on super table
pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
......@@ -2586,18 +2583,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
int32_t rowSize = 0;
SSchema* pSchema = pMetaMsg->schema;
int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
// ignore the tags length
if (i < pMetaMsg->numOfColumns) {
rowSize += pSchema->bytes;
......@@ -2622,15 +2613,16 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
pTableMetaInfo->pTableMeta = (STableMeta *)taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta,
size, tsMeterMetaKeepTimer);
pTableMetaInfo->pTableMeta =
(STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pTableMetaInfo->pTableMeta == NULL) {
return 0;
......@@ -3006,7 +2998,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
if (pTableMetaInfo->pTableMeta) {
bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo);
bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
......@@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, 0);
pSql->res.code = code;
......@@ -177,7 +177,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
int numOfTables = 0;
if (!UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
......@@ -191,7 +191,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
return 0;
numOfTables = 1;
int64_t uid = pTableMetaInfo->pTableMeta->uid;
progress[0].uid = uid;
......@@ -220,7 +220,7 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
// for select query super table, the metricmeta can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pMetricMeta != NULL);
......@@ -239,7 +239,7 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
pQueryInfo->command == TSDB_SQL_SELECT) {
return UTIL_METER_IS_SUPERTABLE(pTableMetaInfo);
return UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
return false;
......@@ -253,7 +253,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
* 1. failed to get metermeta from server; 2. not a super table; 3. limitation is 0;
* 4. show queries, instead of a select query
if (pTableMetaInfo == NULL || !UTIL_METER_IS_SUPERTABLE(pTableMetaInfo) ||
if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) ||
pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) {
return false;
......@@ -1578,7 +1578,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return false;
if (colId == -1 && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
if (colId == -1 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return true;
......@@ -2089,7 +2089,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
assert(pFinalInfo->pMetricMeta != NULL);
......@@ -2190,7 +2190,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
return false;
......@@ -445,6 +445,11 @@ typedef struct STableSidExtInfo {
char tags[];
} STableSidExtInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
* the outputCols is equalled to or larger than numOfCols
* e.g., select min(colName), max(colName), avg(colName) from table
......@@ -452,46 +457,45 @@ typedef struct STableSidExtInfo {
typedef struct {
int16_t vnode;
int32_t numOfSids;
int32_t numOfTables;
uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
uint64_t uid;
TSKEY skey;
TSKEY ekey;
STimeWindow window;
int16_t order;
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second
int64_t intervalTime; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
int16_t numOfTagsCols; // required number of tags
int16_t tagLength; // tag length in current query
int16_t numOfTagsCols; // required number of tags
int16_t tagLength; // tag length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds;
int64_t limit;
int64_t offset;
int64_t limit;
int64_t offset;
int16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t colNameLen;
int64_t colNameList;
int32_t colNameLen;
int64_t colNameList;
int64_t pSqlFuncExprs;
int64_t pSqlFuncExprs;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
......@@ -19,9 +19,10 @@
#include <stdbool.h>
#include <stdint.h>
#include "dataformat.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "dataformat.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -182,23 +183,17 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
typedef void tsdb_query_handle_t; // Use void to hide implementation details
// time window
typedef struct STimeWindow {
int64_t skey;
int64_t ekey;
} STimeWindow;
typedef void* tsdb_query_handle_t; // Use void to hide implementation details
// typedef struct {
// } SColumnFilterInfo;
// query condition to build vnode iterator
typedef struct STSDBQueryCond {
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc/asc order to iterate the data block
SColumnFilterInfo colFilterInfo;
} STSDBQueryCond;
} STsdbQueryCond;
typedef struct SBlockInfo {
STimeWindow window;
......@@ -215,10 +210,13 @@ typedef struct SData {
char * data;
} SData;
typedef struct SDataBlock {
int32_t numOfCols;
SData **pData;
} SDataBlock;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t size;
int32_t numOfCols;
int64_t uid;
int32_t sid;
} SDataBlockInfo;
typedef struct STableIDList {
STableId *tableIds;
......@@ -228,83 +226,107 @@ typedef struct STableIDList {
typedef struct {
} SFields;
typedef struct SQueryRowCond {
int32_t rel;
} SQueryRowCond;
typedef void *tsdbpos_t;
* Get the data block iterator, starting from position according to the query condition
* @param pRepo the TSDB repository to query on
* @param pCond query condition, only includes the filter on primary time stamp
* @param pTableList table sid list
* @return
tsdb_query_handle_t *tsdbQueryFromTableID(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, const STableIDList *pTableList);
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
* Get iterator for super tables, of which tags values satisfy the tag filter info
* NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7))
* The filter string is sent from client directly.
* The build of the tags filter expression from string is done in the iterator generating function.
* @param pRepo the repository to query on
* @param pCond query condition
* @param pTagFilterStr tag filter info
* move to next block
* @param pQueryHandle
* @return
tsdb_query_handle_t *tsdbQueryFromTagConds(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, int16_t stableId,
const char *pTagFilterStr);
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle);
* Reset to the start(end) position of current query, from which the iterator starts.
* Get current data block information
* @param pQueryHandle
* @param position set the iterator traverses position. (TSDB_POS_START|TSDB_POS_END)
* @return
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, int16_t position);
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle);
* move to next block
* @param pQueryHandle
* @param pCond
* Get the pre-calculated information w.r.t. current data block.
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
bool tsdbIterNext(tsdb_query_handle_t *pQueryHandle);
//int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis);
* 当前数据块的信息,调用next函数后,只会获得block的信息,包括:行数、列数、skey/ekey信息。注意该信息并不是现在的SCompBlockInfo信息。
* 因为SCompBlockInfo是完整的数据块信息,但是迭代器返回并不是。
* 查询处理引擎会自己决定需要blockInfo, 还是预计算数据,亦或是完整的数据。
* Get current data block information
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
* @param pQueryHandle
* @return
SBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle);
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList);
* 获取当前数据块的预计算信息,如果块不完整,无预计算信息,如果是cache块,无预计算信息。
* todo remove the parameter of position, and order type
* Get the pre-calculated information w.r.t. current data block.
* Reset to the start(end) position of current query, from which the iterator starts.
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @param pQueryHandle
* @param position set the iterator traverses position
* @param order ascending order or descending order
* @return
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order);
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* return the access position of current query handle
* @param pQueryHandle
* @return
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos);
* todo remove this function later
* @param pQueryHandle
* @return
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SFields *pBlockStatis);
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle);
* 返回加载到缓存中的数据,可能是磁盘数据也可能是内存数据,对客户透明。即使是磁盘数据,返回的结果也是磁盘块中,满足查询时间范围要求的数据行,并不是一个完整的磁盘数
* 据块。
* todo remove this function later
* @param pQueryHandle
* @param pIdList
* @return
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond);
* Get iterator for super tables, of which tags values satisfy the tag filter info
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
* NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7))
* The filter string is sent from client directly.
* The build of the tags filter expression from string is done in the iterator generating function.
* @param pQueryHandle
* @param pCond query condition
* @param pTagFilterStr tag filter info
* @return
SDataBlock *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle);
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr);
* Get the qualified tables for (super) table query.
......@@ -12,8 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <stdlib.h>
#include <unistd.h>
#include "os.h"
#include "taosdef.h"
#include "hash.h"
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "os.h"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册