diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 689f9715d08b4569817d16daca224c2482c7643c..c7be1b84cefa4271f8dae73c9dd3235b6597f0e0 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) { numOfElem += 1; } } else { + /* + * when counting on the primary time stamp column and no statistics data is provided, + * simple use the size value + */ numOfElem = pCtx->size; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 58fc32c824c287d5e426236c41fee34f38526b00..3da374f152cea46e834340b851a4ce68d253022c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -659,14 +659,18 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscError("%p failed to malloc for query msg", pSql); return -1; } - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - char *pStart = pCmd->payload + tsRpcHeadSize; - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; + + if (pQueryInfo->colList.numOfCols <= 0) { + tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta)); + return -1; + } + + char *pStart = pCmd->payload + tsRpcHeadSize; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; @@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { numOfTables = 1; - - pQueryMsg->uid = pTableMeta->uid; - pQueryMsg->numOfTagsCols = 0; - - pQueryMsg->vgId = htonl(pTableMeta->vgId); + pQueryMsg->head.vgId = htonl(pTableMeta->vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query on super table if (pTableMetaInfo->vnodeIndex < 0) { @@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); - pQueryMsg->vgId = htons(vnodeId); + pQueryMsg->head.vgId = htons(vnodeId); } pQueryMsg->numOfTables = htonl(numOfTables); - pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags); if (pQueryInfo->order.order == TSQL_SO_ASC) { pQueryMsg->window.skey = htobe64(pQueryInfo->stime); @@ -711,24 +710,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->window.ekey = htobe64(pQueryInfo->stime); } - pQueryMsg->order = htons(pQueryInfo->order.order); - pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); - - pQueryMsg->interpoType = htons(pQueryInfo->interpoType); - - pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); - pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); - - pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols); - - if (pQueryInfo->colList.numOfCols <= 0) { - tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta)); - return -1; - } - - pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); + pQueryMsg->order = htons(pQueryInfo->order.order); + pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); + pQueryMsg->interpoType = htons(pQueryInfo->interpoType); + pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); + pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); + pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols); + pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); + pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; - pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); if (pQueryInfo->intervalTime < 0) { tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); @@ -776,7 +766,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].bytes = htons(pColSchema->bytes); - pQueryMsg->colList[i].type = htons(pColSchema->type); + pQueryMsg->colList[i].type = htons(pColSchema->type); pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters); // append the filter information after the basic column information @@ -824,11 +814,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return -1; } - pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId); + pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx); - pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); + pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); - pSqlFuncExpr->functionId = htons(pExpr->functionId); + pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pMsg += sizeof(SSqlFuncExprMsg); @@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->colNameLen = htonl(len); // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg); - - // only include the required tag column schema. If a tag is not required, it won't be sent to vnode - if (pTableMetaInfo->numOfTags > 0) { - // always transfer tag schema to vnode if exists - SSchema *pTagSchema = tscGetTableTagSchema(pTableMeta); - - for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) { - if (pTableMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) { - SSchema tbSchema = { - .bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY}; - memcpy(pMsg, &tbSchema, sizeof(SSchema)); - } else { - memcpy(pMsg, &pTagSchema[pTableMetaInfo->tagColumnIndex[j]], sizeof(SSchema)); - } - - pMsg += sizeof(SSchema); - } - } + pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg); SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; if (pGroupbyExpr->numOfGroupCols != 0) { @@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pCmd->payloadLen = msgLen; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; - pQueryMsg->contLen = htonl(msgLen); - + pQueryMsg->head.contLen = htonl(msgLen); assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; @@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { } pRes->row = 0; - tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset); return 0; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 887845b00ed01820ec11ad641a732904145559ab..bfe717c3677d64ed33974d1f259cad6020ba0372 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -284,16 +284,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { //todo free qinfo } else { - contLen = 100; + SRetrieveTableRsp* pRsp = NULL; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); - pRsp->numOfRows = htonl(1); - pRsp->precision = htons(0); - pRsp->offset = htobe64(0); - pRsp->useconds = htobe64(0); - - // todo set the data - *(int64_t*) pRsp->data = 1000; + int32_t code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); + //todo check code rpcRsp = (SRpcMsg) { .handle = pMsg->rpcMsg.handle, diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index cc17df9bec97d30b4efe3b6bd5ae9a0cb1d6afff..2fcc91a7bb84d98ff9ae4e898dc77472b332b008 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -20,122 +20,122 @@ extern "C" { #endif -#include #include +#include #include "taosdef.h" #include "taoserror.h" #include "trpc.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_SUBMIT 3 -#define TSDB_MSG_TYPE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_QUERY 5 -#define TSDB_MSG_TYPE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_RETRIEVE 7 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_REG 1 +#define TSDB_MSG_TYPE_REG_RSP 2 +#define TSDB_MSG_TYPE_SUBMIT 3 +#define TSDB_MSG_TYPE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_QUERY 5 +#define TSDB_MSG_TYPE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_RETRIEVE 7 +#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 // message from mnode to dnode -#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 -#define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10 -#define TSDB_MSG_TYPE_MD_DROP_TABLE 11 -#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12 -#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13 -#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14 -#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15 -#define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16 -#define TSDB_MSG_TYPE_MD_DROP_VNODE 17 -#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 -#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 -#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 -#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 -#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24 -#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 -#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26 +#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 +#define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10 +#define TSDB_MSG_TYPE_MD_DROP_TABLE 11 +#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12 +#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13 +#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14 +#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15 +#define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16 +#define TSDB_MSG_TYPE_MD_DROP_VNODE 17 +#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 +#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 +#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 +#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 +#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 +#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 +#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24 +#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 +#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26 // message from client to mnode -#define TSDB_MSG_TYPE_CM_CONNECT 31 -#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32 -#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33 -#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34 -#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35 -#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36 -#define TSDB_MSG_TYPE_CM_DROP_ACCT 37 -#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38 -#define TSDB_MSG_TYPE_CM_CREATE_USER 39 -#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40 -#define TSDB_MSG_TYPE_CM_ALTER_USER 41 -#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42 -#define TSDB_MSG_TYPE_CM_DROP_USER 43 -#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44 -#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45 -#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 -#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 -#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP -#define TSDB_MSG_TYPE_CM_CREATE_DB 49 -#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 -#define TSDB_MSG_TYPE_CM_DROP_DB 51 -#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52 -#define TSDB_MSG_TYPE_CM_USE_DB 53 -#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54 -#define TSDB_MSG_TYPE_CM_ALTER_DB 55 -#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56 -#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57 -#define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58 -#define TSDB_MSG_TYPE_CM_DROP_TABLE 59 -#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60 -#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61 -#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 -#define TSDB_MSG_TYPE_CM_TABLE_META 63 -#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 -#define TSDB_MSG_TYPE_CM_STABLE_META 65 -#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66 -#define TSDB_MSG_TYPE_CM_TABLES_META 67 -#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 -#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 -#define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70 -#define TSDB_MSG_TYPE_CM_SHOW 71 -#define TSDB_MSG_TYPE_CM_SHOW_RSP 72 -#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 -#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 -#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 -#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 -#define TSDB_MSG_TYPE_CM_KILL_CONN 77 -#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 -#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 -#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 +#define TSDB_MSG_TYPE_CM_CONNECT 31 +#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36 +#define TSDB_MSG_TYPE_CM_DROP_ACCT 37 +#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38 +#define TSDB_MSG_TYPE_CM_CREATE_USER 39 +#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40 +#define TSDB_MSG_TYPE_CM_ALTER_USER 41 +#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42 +#define TSDB_MSG_TYPE_CM_DROP_USER 43 +#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44 +#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45 +#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 +#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 +#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP +#define TSDB_MSG_TYPE_CM_CREATE_DB 49 +#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 +#define TSDB_MSG_TYPE_CM_DROP_DB 51 +#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52 +#define TSDB_MSG_TYPE_CM_USE_DB 53 +#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54 +#define TSDB_MSG_TYPE_CM_ALTER_DB 55 +#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56 +#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57 +#define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58 +#define TSDB_MSG_TYPE_CM_DROP_TABLE 59 +#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 +#define TSDB_MSG_TYPE_CM_TABLE_META 63 +#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 +#define TSDB_MSG_TYPE_CM_STABLE_META 65 +#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66 +#define TSDB_MSG_TYPE_CM_TABLES_META 67 +#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 +#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 +#define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70 +#define TSDB_MSG_TYPE_CM_SHOW 71 +#define TSDB_MSG_TYPE_CM_SHOW_RSP 72 +#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 +#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 +#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 +#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 +#define TSDB_MSG_TYPE_CM_KILL_CONN 77 +#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 +#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 +#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 // message from dnode to mnode -#define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 -#define TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP 92 -#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 93 -#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 94 -#define TSDB_MSG_TYPE_DM_STATUS 95 -#define TSDB_MSG_TYPE_DM_STATUS_RSP 96 -#define TSDB_MSG_TYPE_DM_GRANT 97 -#define TSDB_MSG_TYPE_DM_GRANT_RSP 98 - -#define TSDB_MSG_TYPE_SDB_SYNC 101 -#define TSDB_MSG_TYPE_SDB_SYNC_RSP 102 -#define TSDB_MSG_TYPE_SDB_FORWARD 103 -#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 104 - -#define TSDB_MSG_TYPE_MAX 105 +#define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 +#define TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP 92 +#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 93 +#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 94 +#define TSDB_MSG_TYPE_DM_STATUS 95 +#define TSDB_MSG_TYPE_DM_STATUS_RSP 96 +#define TSDB_MSG_TYPE_DM_GRANT 97 +#define TSDB_MSG_TYPE_DM_GRANT_RSP 98 + +#define TSDB_MSG_TYPE_SDB_SYNC 101 +#define TSDB_MSG_TYPE_SDB_SYNC_RSP 102 +#define TSDB_MSG_TYPE_SDB_FORWARD 103 +#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 104 + +#define TSDB_MSG_TYPE_MAX 105 // IE type -#define TSDB_IE_TYPE_SEC 1 -#define TSDB_IE_TYPE_META 2 -#define TSDB_IE_TYPE_MGMT_IP 3 -#define TSDB_IE_TYPE_DNODE_CFG 4 -#define TSDB_IE_TYPE_NEW_VERSION 5 -#define TSDB_IE_TYPE_DNODE_EXT 6 -#define TSDB_IE_TYPE_DNODE_STATE 7 +#define TSDB_IE_TYPE_SEC 1 +#define TSDB_IE_TYPE_META 2 +#define TSDB_IE_TYPE_MGMT_IP 3 +#define TSDB_IE_TYPE_DNODE_CFG 4 +#define TSDB_IE_TYPE_NEW_VERSION 5 +#define TSDB_IE_TYPE_DNODE_EXT 6 +#define TSDB_IE_TYPE_DNODE_STATE 7 enum _mgmt_table { TSDB_MGMT_TABLE_ACCT, @@ -157,38 +157,38 @@ enum _mgmt_table { TSDB_MGMT_TABLE_MAX, }; -#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 -#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 -#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 +#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 +#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 +#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 -#define TSDB_INTERPO_NONE 0 -#define TSDB_INTERPO_NULL 1 -#define TSDB_INTERPO_SET_VALUE 2 -#define TSDB_INTERPO_LINEAR 3 -#define TSDB_INTERPO_PREV 4 +#define TSDB_INTERPO_NONE 0 +#define TSDB_INTERPO_NULL 1 +#define TSDB_INTERPO_SET_VALUE 2 +#define TSDB_INTERPO_LINEAR 3 +#define TSDB_INTERPO_PREV 4 -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_PRIVILEGES 0x2 +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_PRIVILEGES 0x2 -#define TSDB_KILL_MSG_LEN 30 +#define TSDB_KILL_MSG_LEN 30 -#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) -#define TSDB_COL_NORMAL 0x0u -#define TSDB_COL_TAG 0x1u -#define TSDB_COL_JOIN 0x2u +#define TSDB_COL_NORMAL 0x0u +#define TSDB_COL_TAG 0x1u +#define TSDB_COL_JOIN 0x2u extern char *taosMsg[]; #pragma pack(push, 1) -//typedef struct { +// typedef struct { // int32_t vnode; // int32_t sid; // int32_t sversion; @@ -206,7 +206,7 @@ typedef struct SMsgHead { int32_t vgId; } SMsgHead; -//typedef struct { +// typedef struct { // SMsgDesc desc; // SMsgHead header; // int16_t import; @@ -216,37 +216,37 @@ typedef struct SMsgHead { // Submit message for one table typedef struct SSubmitBlk { - int64_t uid; // table unique id - int32_t tid; // table id - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t len; // data part length, not including the SSubmitBlk head - int16_t numOfRows; // total number of rows in current submit block - char data[]; + int64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t len; // data part length, not including the SSubmitBlk head + int16_t numOfRows; // total number of rows in current submit block + char data[]; } SSubmitBlk; // Submit message for this TSDB typedef struct SSubmitMsg { SMsgHead header; int32_t length; - int32_t compressed:2; - int32_t numOfBlocks:30; + int32_t compressed : 2; + int32_t numOfBlocks : 30; SSubmitBlk blocks[]; } SSubmitMsg; typedef struct { - int32_t index; // index of failed block in submit blocks - int32_t vnode; // vnode index of failed block - int32_t sid; // table index of failed block - int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table + int32_t index; // index of failed block in submit blocks + int32_t vnode; // vnode index of failed block + int32_t sid; // table index of failed block + int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table } SShellSubmitRspBlock; typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; + int32_t code; // 0-success, > 0 error code + int32_t numOfRows; // number of records the client is trying to write + int32_t affectedRows; // number of records actually written + int32_t failedRows; // number of failed records (exclude duplicate records) + int32_t numOfFailedBlocks; SShellSubmitRspBlock failedBlocks[]; } SShellSubmitRspMsg; @@ -258,38 +258,38 @@ typedef struct SSchema { } SSchema; typedef struct { - int32_t vnode; //the index of vnode + int32_t vnode; // the index of vnode uint32_t ip; } SVnodeDesc; typedef struct { - int32_t contLen; - int32_t vgId; - int8_t tableType; - int16_t numOfColumns; - int16_t numOfTags; - int32_t sid; - int32_t sversion; - int32_t tagDataLen; - int32_t sqlDataLen; - uint64_t uid; - uint64_t superTableUid; - uint64_t createdTime; - char tableId[TSDB_TABLE_ID_LEN]; - char superTableId[TSDB_TABLE_ID_LEN]; - char data[]; + int32_t contLen; + int32_t vgId; + int8_t tableType; + int16_t numOfColumns; + int16_t numOfTags; + int32_t sid; + int32_t sversion; + int32_t tagDataLen; + int32_t sqlDataLen; + uint64_t uid; + uint64_t superTableUid; + uint64_t createdTime; + char tableId[TSDB_TABLE_ID_LEN]; + char superTableId[TSDB_TABLE_ID_LEN]; + char data[]; } SMDCreateTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - char db[TSDB_DB_NAME_LEN]; - int8_t igExists; - int16_t numOfTags; - int16_t numOfColumns; - int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int32_t contLen; - int8_t reserved[16]; - char schema[]; + char tableId[TSDB_TABLE_ID_LEN]; + char db[TSDB_DB_NAME_LEN]; + int8_t igExists; + int16_t numOfTags; + int16_t numOfColumns; + int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string + int32_t contLen; + int8_t reserved[16]; + char schema[]; } SCMCreateTableMsg; typedef struct { @@ -331,7 +331,7 @@ typedef struct { int64_t maxQueryTime; // In unit of hour int64_t maxInbound; int64_t maxOutbound; - int8_t accessState; // Configured only by command + int8_t accessState; // Configured only by command } SAcctCfg; typedef struct { @@ -356,11 +356,11 @@ typedef struct { } SMgmtHead; typedef struct { - int32_t contLen; - int32_t vgId; - int32_t sid; - uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t contLen; + int32_t vgId; + int32_t sid; + uint64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropTableMsg; typedef struct { @@ -371,7 +371,7 @@ typedef struct { } SMDDropSTableMsg; typedef struct { - int32_t vgId; + int32_t vgId; } SMDDropVnodeMsg; typedef struct SColIndexEx { @@ -386,7 +386,7 @@ typedef struct SColIndexEx { */ int16_t colIdx; int16_t colIdxInBuf; - uint16_t flag; // denote if it is a tag or not + uint16_t flag; // denote if it is a tag or not char name[TSDB_COL_NAME_LEN]; } SColIndexEx; @@ -458,7 +458,7 @@ typedef struct SColumnInfo { typedef struct STableIdInfo { int32_t sid; int64_t uid; - TSKEY key; // last accessed ts, for subscription + TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { @@ -472,47 +472,30 @@ typedef struct STimeWindow { * the outputCols will be 3 while the numOfCols is 1. */ typedef struct { - int32_t contLen; // msg header - int16_t vgId; - - int32_t numOfTables; - uint64_t uid; + SMsgHead head; STimeWindow window; - - int16_t order; - int16_t orderColId; - - int16_t numOfCols; // the number of columns will be load from vnode - char slidingTimeUnit; // time interval type, for revisement of interval(1d) - - int64_t intervalTime; // time interval for aggregation, in million second - int64_t slidingTime; // value for sliding window - - // tag schema, used to parse tag information in pSidExtInfo - uint64_t pTagSchema; - - int16_t numOfTagsCols; // required number of tags - int16_t tagLength; // tag length in current query - - int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx - uint64_t groupbyTagIds; - - int64_t limit; - int64_t offset; - - int16_t queryType; // denote another query process - int16_t numOfOutputCols; // final output columns numbers - - int16_t interpoType; // interpolate type - uint64_t defaultVal; // default value array list - - int32_t colNameLen; - int64_t colNameList; - - int64_t pSqlFuncExprs; - + int32_t numOfTables; + int16_t order; + int16_t orderColId; + int16_t numOfCols; // the number of columns will be load from vnode + int64_t intervalTime; // time interval for aggregation, in million second + int64_t intervalOffset; // start offset for interval query + int64_t slidingTime; // value for sliding window + char slidingTimeUnit; // time interval type, for revisement of interval(1d) + int16_t tagLength; // tag length in current query + int16_t numOfGroupCols; // num of group by columns + int16_t orderByIdx; + int16_t orderType; // used in group by xx order by xxx + uint64_t groupbyTagIds; + int64_t limit; + int64_t offset; + int16_t queryType; // denote another query process + int16_t numOfOutputCols; // final output columns numbers + int16_t interpoType; // interpolate type + uint64_t defaultVal; // default value array list + + int32_t colNameLen; + int64_t colNameList; int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block int32_t tsNumOfBlocks; // ts comp block numbers @@ -533,9 +516,9 @@ typedef struct { typedef struct SRetrieveTableRsp { int32_t numOfRows; - int8_t completed; // all results are returned to client + int8_t completed; // all results are returned to client int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query + int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; char data[]; } SRetrieveTableRsp; @@ -615,11 +598,11 @@ typedef struct { char dnodeName[TSDB_DNODE_NAME_LEN]; uint32_t privateIp; uint32_t publicIp; - uint32_t lastReboot; // time stamp for last reboot - uint16_t numOfTotalVnodes; // from config file + uint32_t lastReboot; // time stamp for last reboot + uint16_t numOfTotalVnodes; // from config file uint16_t openVnodes; uint16_t numOfCores; - float diskAvailable; // GB + float diskAvailable; // GB uint8_t alternativeRole; uint8_t reserve[15]; SVnodeLoad load[]; @@ -648,7 +631,7 @@ typedef struct { } SCMMultiTableInfoMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SCMSuperTableInfoMsg; typedef struct { @@ -707,18 +690,18 @@ typedef struct STableMetaMsg { uint8_t tableType; int16_t numOfColumns; int16_t sversion; - - int8_t numOfVpeers; + + int8_t numOfVpeers; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int32_t sid; - int32_t vgId; - uint64_t uid; - SSchema schema[]; + int32_t sid; + int32_t vgId; + uint64_t uid; + SSchema schema[]; } STableMetaMsg; typedef struct SMultiTableMeta { - int32_t numOfTables; - int32_t contLen; + int32_t numOfTables; + int32_t contLen; STableMetaMsg metas[]; } SMultiTableMeta; @@ -756,7 +739,7 @@ typedef struct { typedef struct { uint32_t dnode; - int32_t vnode; + int32_t vnode; } SDMConfigVnodeMsg; typedef struct { @@ -783,13 +766,13 @@ typedef struct { } SStreamDesc; typedef struct { - int32_t numOfQueries; - SQueryDesc qdesc[]; + int32_t numOfQueries; + SQueryDesc qdesc[]; } SQqueryList; typedef struct { - int32_t numOfStreams; - SStreamDesc sdesc[]; + int32_t numOfStreams; + SStreamDesc sdesc[]; } SStreamList; typedef struct { diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 346170cde8a00049bf05b0cbb9d8fc9ad62d03ce..b10d869780e9d26b7bf5af6699b961279562ccd9 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -142,7 +142,7 @@ typedef struct SQuery { SResultRec rec; int32_t pos; int64_t pointsOffset; // the number of points offset to save read data - SData** sdata; + SData** sdata; int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -171,11 +171,10 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { uint64_t signature; - void* pVnode; + void* pVnode; TSKEY startTime; int64_t elapsedTime; SResultRec rec; - int32_t pointsReturned; int32_t pointsInterpo; int32_t code; // error code to returned to client int32_t killed; // denotes if current query is killed @@ -184,7 +183,6 @@ typedef struct SQInfo { SQueryRuntimeEnv runtimeEnv; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ -// tSidSet* pSidSet; T_REF_DECLARE() /* @@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg); */ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); - -//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf); +/** + * + * @param pQInfo + * @param pRsp + * @return + */ +int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 04ff9e56f8238ca393ed50a4e975191d158517dd..c7aa57200cff7e7fb6091e55dd4366875f6e57b7 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem { void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx); - int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus); + int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId); } SQLAggFuncElem; typedef struct SPatternCompareInfo { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 04626227dd1b9a731d96f2eedd82516b8b119673..af7d6f9ab07aae86f495bc36c58d536cc4e6b109 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -119,7 +119,7 @@ static int32_t flushFromResultBuf(SQInfo *pQInfo); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); -static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, char *primaryColumnData, int32_t size, +static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size, int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); @@ -427,15 +427,9 @@ static bool isTopBottomQuery(SQuery *pQuery) { return false; } -static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - int32_t columnIndex) { - // no SField info exist, or column index larger than the output column, no result. - if (pStatis == NULL) { - return NULL; - } - +static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) { // for a tag column, no corresponding field info - SColIndexEx *pColIndexEx = &pQuery->pSelectExpr[columnIndex].pBase.colInfo; + SColIndexEx *pColIndexEx = &pQuery->pSelectExpr[index].pBase.colInfo; if (TSDB_COL_IS_TAG(pColIndexEx->flag)) { return NULL; } @@ -453,13 +447,31 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo return NULL; } +/** + * @param pQuery + * @param col + * @param pDataBlockInfo + * @param pStatis + * @param pColStatis + * @return + */ static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SDataStatis **pColStatis) { - if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[col].pBase.colInfo.flag) || pStatis == NULL) { + SColIndexEx* pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo; + if (TSDB_COL_IS_TAG(pColIndex->flag)) { return false; } - - *pColStatis = getStatisInfo(pQuery, pStatis, pDataBlockInfo, col); + + // query on primary timestamp column, not null value at all + if (pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return false; + } + + *pColStatis = NULL; + if (pStatis != NULL) { + *pColStatis = getStatisInfo(pQuery, pStatis, pDataBlockInfo, col); + } + if ((*pColStatis) != NULL && (*pColStatis)->numOfNull == 0) { return false; } @@ -887,7 +899,6 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 * the remain meter may not have the required column in cache actually. * So, the validation of required column in cache with the corresponding meter schema is reinforced. */ - if (pDataBlock == NULL) { return NULL; } @@ -939,10 +950,11 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; SDataStatis *tpField = NULL; - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); - char * dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->size, pDataBlock); + + bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); + char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->size, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, (char *)primaryKeyCol, pDataBlockInfo->size, functionId, tpField, + setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->size, functionId, tpField, hasNull, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -1381,7 +1393,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl return 0; } -void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, char *primaryColumnData, int32_t size, +void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size, int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag) { pCtx->scanFlag = scanFlag; @@ -1396,15 +1408,15 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, char * pCtx->preAggVals.isSet = false; } - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 && (primaryColumnData != NULL)) { - pCtx->ptsList = (int64_t *)(primaryColumnData); + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 && (tsCol != NULL)) { + pCtx->ptsList = tsCol; } if (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) { // last_dist or first_dist function // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null - pCtx->ptsList = (int64_t *)(primaryColumnData); + pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* @@ -1420,7 +1432,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, char * pTWAInfo->EKey = pQuery->window.ekey; } - pCtx->ptsList = (int64_t *)(primaryColumnData); + pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_ARITHM) { pCtx->param[1].pz = param; @@ -2524,22 +2536,18 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) { } SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo, SDataStatis **pStatis) { - SQuery * pQuery = pRuntimeEnv->pQuery; - tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; + SQuery *pQuery = pRuntimeEnv->pQuery; uint32_t r = 0; SArray * pDataBlock = NULL; - // STimeWindow *w = &pQueryHandle->window; - if (pQuery->numOfFilterCols > 0) { r = BLK_DATA_ALL_NEEDED; } else { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId; - - // r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], w->skey, w->ekey, colId); + r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); } if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) { @@ -2553,7 +2561,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl // pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); } else if (r == BLK_DATA_FILEDS_NEEDED) { if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { - // return DISK_DATA_LOAD_FAILED; +// return DISK_DATA_LOAD_FAILED; } if (pStatis == NULL) { @@ -3036,7 +3044,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { total += pData->numOfElems; } - pQuery->sdata[0]->num = total; + int32_t rows = total; int32_t offset = 0; for (int32_t num = 0; num < list.size; ++num) { @@ -3044,7 +3052,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - char * pDest = pQuery->sdata[i]->data; + char * pDest = pQuery->sdata[i]; memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, bytes * pData->numOfElems); @@ -3055,7 +3063,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { assert(pQuery->rec.pointsRead == 0); - pQuery->rec.pointsRead += pQuery->sdata[0]->num; + pQuery->rec.pointsRead += rows; pQInfo->offset += 1; } @@ -3222,7 +3230,8 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. int32_t pageId = -1; - int32_t remain = pQuery->sdata[0]->num; + assert(0); + int32_t remain = 0;//pQuery->sdata[0]->num; int32_t offset = 0; while (remain > 0) { @@ -3253,12 +3262,12 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes; + pCtx[k].aOutputBuf = pQuery->sdata[k] - pCtx[k].outputBytes; pCtx[k].size = 1; pCtx[k].startOffset = 0; pCtx[k].resultInfo = &pResultInfo[k]; - pQuery->sdata[k]->num = 0; + pQuery->sdata[k] = 0; } } @@ -4080,11 +4089,6 @@ int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) { } } -int64_t vnodeGetOffsetVal(void *thandle) { - SQInfo *pQInfo = (SQInfo *)thandle; - return pQInfo->runtimeEnv.pQuery->limit.offset; -} - bool vnodeHasRemainResults(void *handle) { SQInfo *pQInfo = (SQInfo *)handle; @@ -4162,62 +4166,13 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { -#if 0 - SMeterObj *pObj = pQInfo->pObj; - SQuery * pQuery = &pQInfo->query; - - int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; - - // for metric query, bufIndex always be 0. - for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { int32_t bytes = pQuery->pSelectExpr[col].resBytes; memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; } -#endif -} - -/** - * Copy the result data/file to output message buffer. - * If the result is in file format, read file from disk and copy to output buffer, compression is not involved since - * data in file is already compressed. - * In case of other result in buffer, compress the result before copy once the tsComressMsg is set. - * - * @param handle - * @param data - * @param numOfRows the number of rows that are not returned in current retrieve - * @return - */ -int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) { - SQInfo *pQInfo = (SQInfo *)handle; - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - assert(pQuery->pSelectExpr != NULL && pQuery->numOfOutputCols > 0); - - // load data from file to msg buffer - if (isTSCompQuery(pQuery)) { - int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); - - // make sure file exist - if (FD_VALID(fd)) { - size_t s = lseek(fd, 0, SEEK_END); - dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); - - lseek(fd, 0, SEEK_SET); - read(fd, data, s); - close(fd); - - unlink(pQuery->sdata[0]->data); - } else { - dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, - pQuery->sdata[0]->data, strerror(errno)); - } - } else { - doCopyQueryResultToMsg(pQInfo, numOfRows, data); - } - - return numOfRows; } int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, @@ -4308,7 +4263,7 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { #endif } -int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param, void* tsdb) { +int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t code = TSDB_CODE_SUCCESS; @@ -4342,7 +4297,6 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param, void* tsdb) { taosArrayPush(cols, &pQuery->colList[i]); } - pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4993,8 +4947,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { // vnodePrintQueryStatistics(pSupporter); } - dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->rec.pointsRead, - pQInfo->rec.pointsRead, pQInfo->pointsReturned); + dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal); return; } #if 0 @@ -5294,7 +5247,7 @@ void qTableQuery(SQInfo *pQInfo) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQInfo->pointsReturned < pQuery->limit.limit)) { + (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { // todo limit the output for interval query? pQuery->rec.pointsRead = 0; pQInfo->subgroupIdx = 0; // always start from 0 @@ -5436,11 +5389,6 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { return -1; } - if (pQueryTableMsg->numOfTagsCols < 0 || pQueryTableMsg->numOfTagsCols > TSDB_MAX_TAGS + 1) { - dError("qmsg:%p illegal value of numOfTagsCols %d", pQueryTableMsg, pQueryTableMsg->numOfTagsCols); - return -1; - } - if (pQueryTableMsg->numOfCols <= 0 || pQueryTableMsg->numOfCols > TSDB_MAX_COLUMNS) { dError("qmsg:%p illegal value of numOfCols %d", pQueryTableMsg, pQueryTableMsg->numOfCols); return -1; @@ -5496,8 +5444,15 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra return pMsg; } -static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList) { - pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId); +/** + * pQueryTableMsg->head has been converted before this function is called. + * + * @param pQueryTableMsg + * @param pTableIdList + * @param pExpr + * @return + */ +static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr) { pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); @@ -5513,7 +5468,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId pQueryTableMsg->queryType = htons(pQueryTableMsg->queryType); - pQueryTableMsg->numOfTagsCols = htons(pQueryTableMsg->numOfTagsCols); pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols); pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols); pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols); @@ -5572,27 +5526,23 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId bool hasArithmeticFunction = false; - /* - * 1. simple projection query on meters, we only record the pSqlFuncExprs[i].colIdx value - * 2. for complex queries, whole SqlExprs object is required. - */ - pQueryTableMsg->pSqlFuncExprs = (int64_t)malloc(POINTER_BYTES * pQueryTableMsg->numOfOutputCols); + *pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES); + SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg; for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) { - ((SSqlFuncExprMsg **)pQueryTableMsg->pSqlFuncExprs)[i] = pExprMsg; + (*pExpr)[i] = pExprMsg; pExprMsg->colInfo.colIdx = htons(pExprMsg->colInfo.colIdx); - pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); - pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); - - pExprMsg->functionId = htons(pExprMsg->functionId); - pExprMsg->numOfParams = htons(pExprMsg->numOfParams); + pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); + pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->functionId = htons(pExprMsg->functionId); + pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pMsg += sizeof(SSqlFuncExprMsg); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); + pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) { @@ -5628,22 +5578,17 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId pMsg = createTableIdList(pQueryTableMsg, pMsg, pTableIdList); - if (pQueryTableMsg->numOfGroupCols > 0 || pQueryTableMsg->numOfTagsCols > 0) { // group by tag columns - pQueryTableMsg->pTagSchema = (uint64_t)pMsg; - SSchema *pTagSchema = (SSchema *)pQueryTableMsg->pTagSchema; - pMsg += sizeof(SSchema) * pQueryTableMsg->numOfTagsCols; - - if (pQueryTableMsg->numOfGroupCols > 0) { - pQueryTableMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryTableMsg->numOfTagsCols]); - } else { - pQueryTableMsg->groupbyTagIds = 0; - } + if (pQueryTableMsg->numOfGroupCols > 0) { // group by tag columns +// if (pQueryTableMsg->numOfGroupCols > 0) { +// pQueryTableMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryTableMsg->numOfTagsCols]); +// } else { +// pQueryTableMsg->groupbyTagIds = 0; +// } pQueryTableMsg->orderByIdx = htons(pQueryTableMsg->orderByIdx); pQueryTableMsg->orderType = htons(pQueryTableMsg->orderType); pMsg += sizeof(SColIndexEx) * pQueryTableMsg->numOfGroupCols; } else { - pQueryTableMsg->pTagSchema = 0; pQueryTableMsg->groupbyTagIds = 0; } @@ -5657,13 +5602,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId } } - dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 - ", numOfGroupbyTagCols:%d, numOfTagCols:%d, timestamp order:%d, " - "tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 + dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " + "timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryTableMsg, pQueryTableMsg->numOfTables, pQueryTableMsg->window.skey, pQueryTableMsg->window.ekey, - pQueryTableMsg->numOfGroupCols, pQueryTableMsg->numOfTagsCols, pQueryTableMsg->order, - pQueryTableMsg->orderType, pQueryTableMsg->orderByIdx, pQueryTableMsg->numOfOutputCols, + pQueryTableMsg->numOfGroupCols, pQueryTableMsg->order, pQueryTableMsg->orderType, + pQueryTableMsg->orderByIdx, pQueryTableMsg->numOfOutputCols, pQueryTableMsg->numOfCols, pQueryTableMsg->intervalTime, pQueryTableMsg->interpoType, pQueryTableMsg->tsLen, pQueryTableMsg->limit, pQueryTableMsg->offset); @@ -5721,22 +5665,20 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) { +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr, SSqlFuncExprMsg** pExprMsg) { *pSqlFuncExpr = NULL; int32_t code = TSDB_CODE_SUCCESS; SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols); if (pExprs == NULL) { - tfree(pQueryMsg->pSqlFuncExprs); return TSDB_CODE_SERV_OUT_OF_MEMORY; } bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); int16_t tagLen = 0; - SSchema *pTagSchema = (SSchema *)pQueryMsg->pTagSchema; for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { - pExprs[i].pBase = *((SSqlFuncExprMsg **)pQueryMsg->pSqlFuncExprs)[i]; + pExprs[i].pBase = *pExprMsg[i]; pExprs[i].resBytes = 0; int16_t type = 0; @@ -5744,36 +5686,24 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct SColIndexEx *pColumnIndexExInfo = &pExprs[i].pBase.colInfo; - // tag column schema is kept in pQueryMsg->pColumnModel - if (TSDB_COL_IS_TAG(pColumnIndexExInfo->flag)) { - if (pColumnIndexExInfo->colIdx >= pQueryMsg->numOfTagsCols) { - tfree(pExprs); + // parse the arithmetic expression + if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) { + code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); - return TSDB_CODE_INVALID_QUERY_MSG; + if (code != TSDB_CODE_SUCCESS) { + tfree(pExprs); + return code; } - type = pTagSchema[pColumnIndexExInfo->colIdx].type; - bytes = pTagSchema[pColumnIndexExInfo->colIdx].bytes; - - } else { // parse the arithmetic expression - if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) { - code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); - - if (code != TSDB_CODE_SUCCESS) { - tfree(pExprs); - return code; - } - - type = TSDB_DATA_TYPE_DOUBLE; - bytes = tDataTypeDesc[type].nSize; - } else { // parse the normal column - int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); - assert(j < pQueryMsg->numOfCols); + type = TSDB_DATA_TYPE_DOUBLE; + bytes = tDataTypeDesc[type].nSize; + } else { // parse the normal column + int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); + assert(j < pQueryMsg->numOfCols); - SColumnInfo *pCol = &pQueryMsg->colList[j]; - type = pCol->type; - bytes = pCol->bytes; - } + SColumnInfo *pCol = &pQueryMsg->colList[j]; + type = pCol->type; + bytes = pCol->bytes; } int32_t param = pExprs[i].pBase.arg[0].argValue.i64; @@ -5793,7 +5723,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct // TODO refactor for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { - pExprs[i].pBase = *((SSqlFuncExprMsg **)pQueryMsg->pSqlFuncExprs)[i]; + pExprs[i].pBase = *pExprMsg[i]; int16_t functId = pExprs[i].pBase.functionId; if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); @@ -5810,7 +5740,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct } } - tfree(pQueryMsg->pSqlFuncExprs); + tfree(pExprMsg); *pSqlFuncExpr = pExprs; return TSDB_CODE_SUCCESS; @@ -5962,8 +5892,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i].info = pQueryMsg->colList[i]; - // SColumnInfo *pColInfo = &pQuery->colList[i].data; - // pColInfo->filters = NULL; + + SColumnInfo *pColInfo = &pQuery->colList[i].info; + pColInfo->filters = NULL; // if (colList[i].numOfFilters > 0) { // pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); // @@ -5987,7 +5918,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // prepare the result buffer - pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, sizeof(SData *)); + pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, POINTER_BYTES); if (pQuery->sdata == NULL) { goto _clean_memory; } @@ -6142,12 +6073,10 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE pQuery->window.skey = pQueryMsg->window.skey; pQuery->window.ekey = pQueryMsg->window.ekey; - - pQuery->lastKey = pQuery->window.skey; + pQuery->lastKey = pQuery->window.skey; if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { - // dError("QInfo:%p vid:%d sid:%d meterId:%s, init dataReady sem failed, reason:%s", pQInfo, pMeterObj->vnode, - // pMeterObj->sid, pMeterObj->meterId, strerror(errno)); + dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); code = TSDB_CODE_APP_ERROR; goto _error; } @@ -6163,7 +6092,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE tsBufNextPos(pTSBuf); } - if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { + if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -6188,7 +6117,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ int32_t code = TSDB_CODE_SUCCESS; SArray *pTableIdList = NULL; - if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) { + SSqlFuncExprMsg** pExprMsg = NULL; + if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg)) != TSDB_CODE_SUCCESS) { return code; } @@ -6206,7 +6136,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ } SSqlFunctionExpr *pExprs = NULL; - if ((code = createSqlFunctionExprFromMsg(pQueryTableMsg, &pExprs)) != TSDB_CODE_SUCCESS) { + if ((code = createSqlFunctionExprFromMsg(pQueryTableMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) { goto _query_over; } @@ -6261,15 +6191,119 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro sem_wait(&pQInfo->dataReady); SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + *numOfRows = pQInfo->rec.pointsRead; + *rowsize = pQuery->rowSize; - // *numOfRows = pQInfo->rec.pointsRead; - // *rowsize = pQuery->rowSize; - *numOfRows = 1; - - // dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, - // *rowsize, *numOfRows, pQInfo->code); + dTrace("QInfo:%p, retrieve res info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); if (pQInfo->code < 0) { // less than 0 means there are error existed. return -pQInfo->code; } } + +static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + + /* + * get the file size and set the numOfRows to be the file size, since for tsComp query, + * the returned row size is equalled to 1 + * TODO handle the case that the file is too large to send back one time + */ + if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { + struct stat fstat; + if (stat(pQuery->sdata[0]->data, &fstat) == 0) { + *numOfRows = fstat.st_size; + return fstat.st_size; + } else { + dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); + return 0; + } + } else { + return pQuery->rowSize * (*numOfRows); + } +} + +static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { + // the remained number of retrieved rows, not the interpolated result + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + // load data from file to msg buffer + if (isTSCompQuery(pQuery)) { + int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); + + // make sure file exist + if (FD_VALID(fd)) { + size_t s = lseek(fd, 0, SEEK_END); + dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); + + lseek(fd, 0, SEEK_SET); + read(fd, data, s); + close(fd); + + unlink(pQuery->sdata[0]->data); + } else { + dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, + pQuery->sdata[0]->data, strerror(errno)); + } + } else { + doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data); + } + + pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; + dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); +} + +static void addToTaskQueue(SQInfo* pQInfo) { + // no error occurred, continue retrieving data + if (pQInfo->code == TSDB_CODE_SUCCESS) { +#ifdef _TD_ARM_ + dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature); +#else + dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); +#endif + + if (pQInfo->killed == 1) { + dTrace("%p freed or killed, abort query", pQInfo); + } else { + // todo add to task queue + } + } +} + +int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen) { + if (pQInfo == NULL || !isQInfoValid(pQInfo)) { + return TSDB_CODE_INVALID_QHANDLE; + } + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead); + *contLen = size + sizeof(SRetrieveTableRsp); + + // todo handle failed to allocate memory + *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); + + (*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead); + + int32_t code = pQInfo->code; + if (code == TSDB_CODE_SUCCESS) { + (*pRsp)->offset = htobe64(pQuery->limit.offset); + (*pRsp)->useconds = htobe64(pQInfo->elapsedTime); + } else { + (*pRsp)->offset = 0; + (*pRsp)->useconds = 0; + } + + if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { + doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); + addToTaskQueue(pQInfo); + return TSDB_CODE_SUCCESS; + } + + assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + +// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { +// dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); +// vnodeDecRefCount(pObj->qhandle); +// pObj->qhandle = NULL; +// } +} \ No newline at end of file diff --git a/src/util/src/shash.c b/src/util/src/shash.c index da97af84bbc957ba102add1b34bff23d71c91d0e..525d00e81e047867701f4dff377013a93ab05f40 100644 --- a/src/util/src/shash.c +++ b/src/util/src/shash.c @@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) { if (pObj == NULL || pObj->maxSessions == 0) return; if (string == NULL || string[0] == 0) return; + return; + hash = (*(pObj->hashFp))(pObj, string); pthread_mutex_lock(&pObj->mutex); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index d654308f9956dc34d92ba902ac9d0c087f608b6d..1c4f09f4c93987fbed723b5f02a6e85c9c5760be 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle { uint16_t flag; // denotes reversed scan of data or not int16_t order; STimeWindow window; // the primary query time window that applies to all queries - TSKEY lastKey; int32_t blockBufferSize; SCompBlock *pBlock; int32_t numOfBlocks; @@ -264,8 +263,20 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond pQueryHandle->pColumns = pColumnInfo; pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; - - pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query + + // only support table query + assert(taosArrayGetSize(idList) == 1); + + pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec)); + STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo; + + pTableQRec->lastKey = pQueryHandle->window.skey; + + STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0); + + STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; + STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId); + pTableQRec->pTableObj = pTable; // malloc buffer in order to load data from file int32_t numOfCols = taosArrayGetSize(pColumnInfo); @@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond return (tsdb_query_handle_t)pQueryHandle; } -static int32_t next = 1; bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { - if (next == 0) { + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + STable *pTable = pHandle->pTableQueryInfo->pTableObj; + + // no data in cache, abort + if (pTable->mem == NULL && pTable->imem == NULL) { return false; - } else { - next = 0; - return true; } + + // all data in mem are checked already. + if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) { + return false; + } + + return true; } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, @@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0); - STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; - STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId); - assert(pTable != NULL); + STable *pTable = pHandle->pTableQueryInfo->pTableObj; TSKEY skey = 0, ekey = 0; int32_t rows = 0; @@ -357,8 +373,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { } SDataBlockInfo blockInfo = { - .uid = tableId.uid, - .sid = tableId.tid, + .uid = idInfo->uid, + .sid = idInfo->sid, .size = rows, .window = {.skey = skey, .ekey = ekey} };