From cbe666b28faca427ccc07baf3e24bd1ebbab0c99 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 20 Mar 2020 00:03:55 +0800 Subject: [PATCH] [TD-32] add query io interface --- src/client/inc/tscUtil.h | 6 +- src/client/src/tscAsync.c | 2 +- src/client/src/tscJoinProcess.c | 2 +- src/client/src/tscLocal.c | 4 +- src/client/src/tscParseInsert.c | 4 +- src/client/src/tscSQLParser.c | 36 ++++----- src/client/src/tscSchemaUtil.c | 12 +-- src/client/src/tscServer.c | 52 ++++++------ src/client/src/tscStream.c | 2 +- src/client/src/tscSub.c | 4 +- src/client/src/tscUtil.c | 12 +-- src/inc/taosmsg.h | 32 ++++---- src/vnode/tsdb/inc/tsdb.h | 126 ++++++++++++++++++------------ src/vnode/tsdb/src/tsdbMetaFile.c | 3 +- src/vnode/tsdb/src/tsdbRead.c | 16 ++++ 15 files changed, 174 insertions(+), 139 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6d51d89f55..71e5c8ffe2 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -29,10 +29,10 @@ extern "C" { #include "tscSecondaryMerge.h" #include "tsclient.h" -#define UTIL_METER_IS_SUPERTABLE(metaInfo) \ +#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) -#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo))) -#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \ +#define UTIL_TABLE_IS_NOMRAL_TABLE(metaInfo) (!(UTIL_TABLE_IS_SUPERTABLE(metaInfo))) +#define UTIL_TABLE_CREATE_FROM_STABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) #define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c9c1ea825c..d24cff5618 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -535,7 +535,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { code = tscGetMetricMeta(pSql, pCmd->clauseIndex); pRes->code = code; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 2c66482b28..ee849bc23f 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -325,7 +325,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pNewQueryInfo->limit = pSupporter->limit; // fetch the join tag column - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); assert(pQueryInfo->tagCond.joinInfo.hasJoin); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 90a86788cf..e864ffa786 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { int32_t numOfRows = tscGetNumOfColumns(pMeta); int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta); - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { numOfRows = numOfRows + tscGetNumOfTags(pMeta); } @@ -154,7 +154,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { } } - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return 0; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 75ead0007b..f2e40dd9fb 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -785,7 +785,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { return code; } - if (!UTIL_METER_IS_SUPERTABLE(pSTableMeterMetaInfo)) { + if (!UTIL_TABLE_IS_SUPERTABLE(pSTableMeterMetaInfo)) { return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z); } @@ -1081,7 +1081,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? } - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); goto _error_clean; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 02267e0796..36b9aab7c3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1382,7 +1382,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum STableComInfo tinfo = tscGetTableInfo(pTableMeta); - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { numOfTotalColumns = tinfo.numOfColumns + tinfo.numOfTags; } else { numOfTotalColumns = tinfo.numOfColumns; @@ -1444,7 +1444,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -2251,7 +2251,7 @@ bool validateIpAddress(const char* ip, size_t size) { int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (pTableMetaInfo->pTableMeta == NULL || !UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return TSDB_CODE_INVALID_SQL; } @@ -2289,7 +2289,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { /* transfer the field-info back to original input format */ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return; } @@ -2509,7 +2509,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* } if (groupTag) { - if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg9); } @@ -3218,7 +3218,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum } // table to table/ super table to super table are allowed - if (UTIL_METER_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_METER_IS_SUPERTABLE(pRightMeterMeta)) { + if (UTIL_TABLE_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_TABLE_IS_SUPERTABLE(pRightMeterMeta)) { invalidSqlErrMsg(pQueryInfo->msg, msg5); return false; } @@ -3301,7 +3301,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S } else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags // check for tag query condition - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -3659,7 +3659,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { } STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns // must be present for join if (pCondExpr->pJoinExpr == NULL) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -3697,7 +3697,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) { static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { SColumnIndex index = {0}; getColumnIndexByName(&pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index); @@ -4045,7 +4045,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { } /* for metric query, set default ascending order for group output */ - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { pQueryInfo->groupbyExpr.orderType = TSQL_SO_ASC; } } @@ -4071,7 +4071,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema * * for super table query, the order option must be less than 3. */ - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (pSortorder->nExpr > 1) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } @@ -4092,7 +4092,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz}; SColumnIndex index = {0}; - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { // metric query + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // metric query if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -4228,13 +4228,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } - } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo))) { + } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo))) { return invalidSqlErrMsg(pQueryInfo->msg, msg4); } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) && - UTIL_METER_IS_CREATE_FROM_METRIC(pTableMetaInfo)) { + UTIL_TABLE_CREATE_FROM_STABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg6); } @@ -4627,7 +4627,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* return TSDB_CODE_SUCCESS; } - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { bool queryOnTags = false; if (tscQueryOnlyMetricTags(pQueryInfo, &queryOnTags) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -5539,7 +5539,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return code; } - bool isSTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo); + bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -5687,7 +5687,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_INVALID_SQL; } - bool isSTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo); + bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 76e116a2ce..f4fb761ed9 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -156,17 +156,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); pTableMeta->tableType = pTableMetaMsg->tableType; - pTableMeta->tableInfo = (STableComInfo){.numOfTags = pTableMetaMsg->numOfTags, .numOfColumns = pTableMetaMsg->numOfColumns, - .precision = pTableMetaMsg->precision}; + + pTableMeta->tableInfo = (STableComInfo) { + .numOfTags = pTableMetaMsg->numOfTags, + .numOfColumns = pTableMetaMsg->numOfColumns, + .precision = pTableMetaMsg->precision + }; + pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgid = pTableMetaMsg->vgid; pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers; memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers); - -// pTableMeta->tableId = pTableMetaMsg->tableId; - memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 550a887622..38278d4bc0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -22,7 +22,6 @@ #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" -#include "tscompression.h" #include "tsocket.h" #include "ttime.h" #include "ttimer.h" @@ -542,7 +541,7 @@ int tscProcessSql(SSqlObj *pSql) { // temp pSql->ipList = &tscMgmtIpList; -// if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { +// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pSql->index = pTableMetaInfo->pTableMeta->index; // } else { // it must be the parent SSqlObj for super table query // if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { @@ -1277,7 +1276,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { // char * pStart = buf + tsRpcHeadSize; // SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; // -// if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { // pColumnModel == NULL, query on meter +// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pColumnModel == NULL, query on meter // STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; // pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode); // } else { // query on metric @@ -1301,7 +1300,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // meter query without tags values - if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize; } @@ -1326,7 +1325,7 @@ static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vn SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables); - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { #ifdef _DEBUG_VIEW tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid); #endif @@ -1373,7 +1372,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - char * pStart = pCmd->payload + tsRpcHeadSize; + char *pStart = pCmd->payload + tsRpcHeadSize; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; @@ -1383,15 +1382,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t msgLen = 0; int32_t numOfTables = 0; - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { numOfTables = 1; -// tscTrace("%p query on vnode: %d, number of sid:%d, meter id: %s", pSql, -// pTableMeta->vpeerDesc[pTableMeta->index].vnode, 1, pTableMetaInfo->name); - -// pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode); pQueryMsg->uid = pTableMeta->uid; pQueryMsg->numOfTagsCols = 0; + + tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query on super table if (pTableMetaInfo->vnodeIndex < 0) { tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex); @@ -1407,19 +1404,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return -1; // error } - tscTrace("%p query on vid:%d, number of sid:%d", pSql, vnodeId, numOfTables); + tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); pQueryMsg->vnode = htons(vnodeId); } - pQueryMsg->numOfSids = htonl(numOfTables); + pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags); if (pQueryInfo->order.order == TSQL_SO_ASC) { - pQueryMsg->skey = htobe64(pQueryInfo->stime); - pQueryMsg->ekey = htobe64(pQueryInfo->etime); + pQueryMsg->window.skey = htobe64(pQueryInfo->stime); + pQueryMsg->window.ekey = htobe64(pQueryInfo->etime); } else { - pQueryMsg->skey = htobe64(pQueryInfo->etime); - pQueryMsg->ekey = htobe64(pQueryInfo->stime); + pQueryMsg->window.skey = htobe64(pQueryInfo->etime); + pQueryMsg->window.ekey = htobe64(pQueryInfo->stime); } pQueryMsg->order = htons(pQueryInfo->order.order); @@ -1453,9 +1450,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { // query on meter + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter pQueryMsg->tagLength = 0; - } else { // query on metric + } else { // query on super table pQueryMsg->tagLength = htons(pMetricMeta->tagLen); } @@ -2586,18 +2583,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode); } - int32_t rowSize = 0; SSchema* pSchema = pMetaMsg->schema; int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags; for (int i = 0; i < numOfTotalCols; ++i) { pSchema->bytes = htons(pSchema->bytes); pSchema->colId = htons(pSchema->colId); - - // ignore the tags length - if (i < pMetaMsg->numOfColumns) { - rowSize += pSchema->bytes; - } pSchema++; } @@ -2622,15 +2613,16 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); assert(pTableMetaInfo->pTableMeta == NULL); - pTableMetaInfo->pTableMeta = (STableMeta *)taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, - size, tsMeterMetaKeepTimer); + pTableMetaInfo->pTableMeta = + (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsMeterMetaKeepTimer); + // todo handle out of memory case if (pTableMetaInfo->pTableMeta == NULL) { - return 0; + return TSDB_CODE_CLI_OUT_OF_MEMORY; } free(pTableMeta); - return TSDB_CODE_OTHERS; + return TSDB_CODE_SUCCESS; } /** @@ -3006,7 +2998,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); if (pTableMetaInfo->pTableMeta) { - bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pTableMetaInfo); + bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index ac8fb95a13..46e3ac2e60 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - if (code == 0 && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { code = tscGetMetricMeta(pSql, 0); pSql->res.code = code; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index fda4522c68..ceb3b180b9 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -177,7 +177,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0); int numOfTables = 0; - if (!UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); @@ -191,7 +191,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { return 0; } - if (UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { numOfTables = 1; int64_t uid = pTableMetaInfo->pTableMeta->uid; progress[0].uid = uid; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 16a39a4db4..014de272ed 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -220,7 +220,7 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { } // for select query super table, the metricmeta can not be null in any cases. - if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { assert(pTableMetaInfo->pMetricMeta != NULL); } @@ -239,7 +239,7 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) { - return UTIL_METER_IS_SUPERTABLE(pTableMetaInfo); + return UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); } return false; @@ -253,7 +253,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { * 1. failed to get metermeta from server; 2. not a super table; 3. limitation is 0; * 4. show queries, instead of a select query */ - if (pTableMetaInfo == NULL || !UTIL_METER_IS_SUPERTABLE(pTableMetaInfo) || + if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) { return false; } @@ -1578,7 +1578,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) { return false; } - if (colId == -1 && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (colId == -1 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return true; } @@ -2089,7 +2089,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1); - if (UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { assert(pFinalInfo->pMetricMeta != NULL); } @@ -2190,7 +2190,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (!UTIL_METER_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { + if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { return false; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a2902e28b7..8624fc722c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -445,6 +445,11 @@ typedef struct STableSidExtInfo { char tags[]; } STableSidExtInfo; +typedef struct STimeWindow { + TSKEY skey; + TSKEY ekey; +} STimeWindow; + /* * the outputCols is equalled to or larger than numOfCols * e.g., select min(colName), max(colName), avg(colName) from table @@ -452,46 +457,45 @@ typedef struct STableSidExtInfo { */ typedef struct { int16_t vnode; - int32_t numOfSids; + int32_t numOfTables; uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may uint64_t uid; - TSKEY skey; - TSKEY ekey; + STimeWindow window; int16_t order; int16_t orderColId; int16_t numOfCols; // the number of columns will be load from vnode - char slidingTimeUnit; // time interval type, for revisement of interval(1d) + char slidingTimeUnit; // time interval type, for revisement of interval(1d) - int64_t intervalTime; // time interval for aggregation, in million second + int64_t intervalTime; // time interval for aggregation, in million second int64_t slidingTime; // value for sliding window // tag schema, used to parse tag information in pSidExtInfo uint64_t pTagSchema; - int16_t numOfTagsCols; // required number of tags - int16_t tagLength; // tag length in current query + int16_t numOfTagsCols; // required number of tags + int16_t tagLength; // tag length in current query int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx uint64_t groupbyTagIds; - int64_t limit; - int64_t offset; + int64_t limit; + int64_t offset; - int16_t queryType; // denote another query process - int16_t numOfOutputCols; // final output columns numbers + int16_t queryType; // denote another query process + int16_t numOfOutputCols; // final output columns numbers int16_t interpoType; // interpolate type uint64_t defaultVal; // default value array list - int32_t colNameLen; - int64_t colNameList; + int32_t colNameLen; + int64_t colNameList; - int64_t pSqlFuncExprs; + int64_t pSqlFuncExprs; int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 9b7215d7da..2a8a7e3850 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -19,9 +19,10 @@ #include #include +#include "dataformat.h" #include "taosdef.h" #include "taosmsg.h" -#include "dataformat.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { @@ -182,23 +183,17 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg); // -- FOR QUERY TIME SERIES DATA -typedef void tsdb_query_handle_t; // Use void to hide implementation details - -// time window -typedef struct STimeWindow { - int64_t skey; - int64_t ekey; -} STimeWindow; +typedef void* tsdb_query_handle_t; // Use void to hide implementation details // typedef struct { // } SColumnFilterInfo; // query condition to build vnode iterator -typedef struct STSDBQueryCond { +typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc/asc order to iterate the data block SColumnFilterInfo colFilterInfo; -} STSDBQueryCond; +} STsdbQueryCond; typedef struct SBlockInfo { STimeWindow window; @@ -215,10 +210,13 @@ typedef struct SData { char * data; } SData; -typedef struct SDataBlock { - int32_t numOfCols; - SData **pData; -} SDataBlock; +typedef struct SDataBlockInfo { + STimeWindow window; + int32_t size; + int32_t numOfCols; + int64_t uid; + int32_t sid; +} SDataBlockInfo; typedef struct STableIDList { STableId *tableIds; @@ -228,83 +226,107 @@ typedef struct STableIDList { typedef struct { } SFields; +#define TSDB_TS_GREATER_EQUAL 1 +#define TSDB_TS_LESS_EQUAL 2 + +typedef struct SQueryRowCond { + int32_t rel; + TSKEY ts; +} SQueryRowCond; + +typedef void *tsdbpos_t; + /** * Get the data block iterator, starting from position according to the query condition - * @param pRepo the TSDB repository to query on * @param pCond query condition, only includes the filter on primary time stamp * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryFromTableID(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, const STableIDList *pTableList); +tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); /** - * Get iterator for super tables, of which tags values satisfy the tag filter info - * - * NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7)) - * The filter string is sent from client directly. - * The build of the tags filter expression from string is done in the iterator generating function. - * - * @param pRepo the repository to query on - * @param pCond query condition - * @param pTagFilterStr tag filter info + * move to next block + * @param pQueryHandle * @return */ -tsdb_query_handle_t *tsdbQueryFromTagConds(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, int16_t stableId, - const char *pTagFilterStr); +bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle); /** - * Reset to the start(end) position of current query, from which the iterator starts. + * Get current data block information * * @param pQueryHandle - * @param position set the iterator traverses position. (TSDB_POS_START|TSDB_POS_END) * @return */ -int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, int16_t position); +SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle); /** - * move to next block - * @param pQueryHandle - * @param pCond + * + * Get the pre-calculated information w.r.t. current data block. + * + * In case of data block in cache, the pBlockStatis will always be NULL. + * If a block is not completed loaded from disk, the pBlockStatis will be NULL. + + * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 * @return */ -bool tsdbIterNext(tsdb_query_handle_t *pQueryHandle); +//int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis); /** - * 当前数据块的信息,调用next函数后,只会获得block的信息,包括:行数、列数、skey/ekey信息。注意该信息并不是现在的SCompBlockInfo信息。 - * 因为SCompBlockInfo是完整的数据块信息,但是迭代器返回并不是。 - * 查询处理引擎会自己决定需要blockInfo, 还是预计算数据,亦或是完整的数据。 - * Get current data block information + * The query condition with primary timestamp is passed to iterator during its constructor function, + * the returned data block must be satisfied with the time window condition in any cases, + * which means the SData data block is not actually the completed disk data blocks. * * @param pQueryHandle * @return */ -SBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle); +SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList); /** - * 获取当前数据块的预计算信息,如果块不完整,无预计算信息,如果是cache块,无预计算信息。 + * todo remove the parameter of position, and order type * - * Get the pre-calculated information w.r.t. current data block. + * Reset to the start(end) position of current query, from which the iterator starts. * - * In case of data block in cache, the pBlockStatis will always be NULL. - * If a block is not completed loaded from disk, the pBlockStatis will be NULL. + * @param pQueryHandle + * @param position set the iterator traverses position + * @param order ascending order or descending order + * @return + */ +int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order); - * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 +/** + * return the access position of current query handle + * @param pQueryHandle + * @return + */ +int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos); + +/** + * todo remove this function later + * @param pQueryHandle * @return */ -int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SFields *pBlockStatis); +tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle); /** - * 返回加载到缓存中的数据,可能是磁盘数据也可能是内存数据,对客户透明。即使是磁盘数据,返回的结果也是磁盘块中,满足查询时间范围要求的数据行,并不是一个完整的磁盘数 - * 据块。 + * todo remove this function later + * @param pQueryHandle + * @param pIdList + * @return + */ +SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond); + +/** + * Get iterator for super tables, of which tags values satisfy the tag filter info * - * The query condition with primary timestamp is passed to iterator during its constructor function, - * the returned data block must be satisfied with the time window condition in any cases, - * which means the SData data block is not actually the completed disk data blocks. + * NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7)) + * The filter string is sent from client directly. + * The build of the tags filter expression from string is done in the iterator generating function. * - * @param pQueryHandle + * @param pCond query condition + * @param pTagFilterStr tag filter info * @return */ -SDataBlock *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle); +tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr); /** * Get the qualified tables for (super) table query. diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/vnode/tsdb/src/tsdbMetaFile.c index 689f8033db..2a32283c06 100644 --- a/src/vnode/tsdb/src/tsdbMetaFile.c +++ b/src/vnode/tsdb/src/tsdbMetaFile.c @@ -12,8 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include -#include +#include "os.h" #include "taosdef.h" #include "hash.h" diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index e69de29bb2..eb4a05d9e3 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" -- GitLab