未验证 提交 a863cd86 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1411 from taosdata/liaohj_2

Liaohj 2
...@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) { ...@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem += 1; numOfElem += 1;
} }
} else { } else {
/*
* when counting on the primary time stamp column and no statistics data is provided,
* simple use the size value
*/
numOfElem = pCtx->size; numOfElem = pCtx->size;
} }
} }
......
...@@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
} }
pSql->retry = 0; pSql->retry = 0;
pRes->rspLen = 0; pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
} else { } else {
...@@ -283,7 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -283,7 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(rpcMsg->msgType == pCmd->msgType + 1); assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = (int32_t)rpcMsg->code; pRes->code = rpcMsg->code;
pRes->rspType = rpcMsg->msgType; pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen; pRes->rspLen = rpcMsg->contLen;
...@@ -662,12 +662,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -662,12 +662,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
char *pStart = pCmd->payload + tsRpcHeadSize;
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
}
char *pStart = pCmd->payload + tsRpcHeadSize;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1; numOfTables = 1;
pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table } else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) { if (pTableMetaInfo->vnodeIndex < 0) {
...@@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vgId = htons(vnodeId); pQueryMsg->head.vgId = htons(vnodeId);
} }
pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags);
if (pQueryInfo->order.order == TSQL_SO_ASC) { if (pQueryInfo->order.order == TSQL_SO_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->stime); pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
...@@ -713,22 +712,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -713,22 +712,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType); pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols); pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
}
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
if (pQueryInfo->intervalTime < 0) { if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
...@@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len); pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg); pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
// always transfer tag schema to vnode if exists
SSchema *pTagSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
if (pTableMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
SSchema tbSchema = {
.bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
memcpy(pMsg, &tbSchema, sizeof(SSchema));
} else {
memcpy(pMsg, &pTagSchema[pTableMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
}
pMsg += sizeof(SSchema);
}
}
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) { if (pGroupbyExpr->numOfGroupCols != 0) {
...@@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen); pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
} }
pRes->row = 0; pRes->row = 0;
tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset); tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
return 0; return 0;
......
...@@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
...@@ -263,37 +263,18 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -263,37 +263,18 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
int32_t contLen = 0; int32_t contLen = 0;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp); contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0; memset(pRsp, 0, sizeof(SRetrieveTableRsp));
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
//todo free qinfo
} else { } else {
contLen = 100; // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); }
pRsp->numOfRows = htonl(1);
pRsp->precision = htons(0);
pRsp->offset = htobe64(0);
pRsp->useconds = htobe64(0);
// todo set the data
*(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) { rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
...@@ -302,7 +283,6 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -302,7 +283,6 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
.code = code, .code = code,
.msgType = 0 .msgType = 0
}; };
}
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -188,7 +188,7 @@ extern char *taosMsg[]; ...@@ -188,7 +188,7 @@ extern char *taosMsg[];
#pragma pack(push, 1) #pragma pack(push, 1)
//typedef struct { // typedef struct {
// int32_t vnode; // int32_t vnode;
// int32_t sid; // int32_t sid;
// int32_t sversion; // int32_t sversion;
...@@ -206,7 +206,7 @@ typedef struct SMsgHead { ...@@ -206,7 +206,7 @@ typedef struct SMsgHead {
int32_t vgId; int32_t vgId;
} SMsgHead; } SMsgHead;
//typedef struct { // typedef struct {
// SMsgDesc desc; // SMsgDesc desc;
// SMsgHead header; // SMsgHead header;
// int16_t import; // int16_t import;
...@@ -229,8 +229,8 @@ typedef struct SSubmitBlk { ...@@ -229,8 +229,8 @@ typedef struct SSubmitBlk {
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int32_t length; int32_t length;
int32_t compressed:2; int32_t compressed : 2;
int32_t numOfBlocks:30; int32_t numOfBlocks : 30;
SSubmitBlk blocks[]; SSubmitBlk blocks[];
} SSubmitMsg; } SSubmitMsg;
...@@ -258,7 +258,7 @@ typedef struct SSchema { ...@@ -258,7 +258,7 @@ typedef struct SSchema {
} SSchema; } SSchema;
typedef struct { typedef struct {
int32_t vnode; //the index of vnode int32_t vnode; // the index of vnode
uint32_t ip; uint32_t ip;
} SVnodeDesc; } SVnodeDesc;
...@@ -472,47 +472,30 @@ typedef struct STimeWindow { ...@@ -472,47 +472,30 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1. * the outputCols will be 3 while the numOfCols is 1.
*/ */
typedef struct { typedef struct {
int32_t contLen; // msg header SMsgHead head;
int16_t vgId;
int32_t numOfTables;
uint64_t uid;
STimeWindow window; STimeWindow window;
int32_t numOfTables;
int16_t order; int16_t order;
int16_t orderColId; int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second int64_t intervalTime; // time interval for aggregation, in million second
int64_t intervalOffset; // start offset for interval query
int64_t slidingTime; // value for sliding window int64_t slidingTime; // value for sliding window
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
int16_t numOfTagsCols; // required number of tags
int16_t tagLength; // tag length in current query int16_t tagLength; // tag length in current query
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds; uint64_t groupbyTagIds;
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
int16_t queryType; // denote another query process int16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list uint64_t defaultVal; // default value array list
int32_t colNameLen; int32_t colNameLen;
int64_t colNameList; int64_t colNameList;
int64_t pSqlFuncExprs;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsNumOfBlocks; // ts comp block numbers
......
...@@ -175,7 +175,6 @@ typedef struct SQInfo { ...@@ -175,7 +175,6 @@ typedef struct SQInfo {
TSKEY startTime; TSKEY startTime;
int64_t elapsedTime; int64_t elapsedTime;
SResultRec rec; SResultRec rec;
int32_t pointsReturned;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed int32_t killed; // denotes if current query is killed
...@@ -184,7 +183,6 @@ typedef struct SQInfo { ...@@ -184,7 +183,6 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
// tSidSet* pSidSet;
T_REF_DECLARE() T_REF_DECLARE()
/* /*
...@@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg); ...@@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg);
*/ */
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
/**
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf); *
* @param pQInfo
* @param pRsp
* @return
*/
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem { ...@@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem {
void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx); void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem; } SQLAggFuncElem;
typedef struct SPatternCompareInfo { typedef struct SPatternCompareInfo {
......
此差异已折叠。
...@@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) { ...@@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) {
if (pObj == NULL || pObj->maxSessions == 0) return; if (pObj == NULL || pObj->maxSessions == 0) return;
if (string == NULL || string[0] == 0) return; if (string == NULL || string[0] == 0) return;
return;
hash = (*(pObj->hashFp))(pObj, string); hash = (*(pObj->hashFp))(pObj, string);
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
......
...@@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle { ...@@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle {
uint16_t flag; // denotes reversed scan of data or not uint16_t flag; // denotes reversed scan of data or not
int16_t order; int16_t order;
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
TSKEY lastKey;
int32_t blockBufferSize; int32_t blockBufferSize;
SCompBlock *pBlock; SCompBlock *pBlock;
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -265,7 +264,19 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond ...@@ -265,7 +264,19 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->loadDataAfterSeek = false; pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query // only support table query
assert(taosArrayGetSize(idList) == 1);
pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec));
STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo;
pTableQRec->lastKey = pQueryHandle->window.skey;
STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId);
pTableQRec->pTableObj = pTable;
// malloc buffer in order to load data from file // malloc buffer in order to load data from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo); int32_t numOfCols = taosArrayGetSize(pColumnInfo);
...@@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond ...@@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
return (tsdb_query_handle_t)pQueryHandle; return (tsdb_query_handle_t)pQueryHandle;
} }
static int32_t next = 1;
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
if (next == 0) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
}
// all data in mem are checked already.
if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) {
return false; return false;
} else {
next = 0;
return true;
} }
return true;
} }
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
...@@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0); STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; STable *pTable = pHandle->pTableQueryInfo->pTableObj;
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId);
assert(pTable != NULL);
TSKEY skey = 0, ekey = 0; TSKEY skey = 0, ekey = 0;
int32_t rows = 0; int32_t rows = 0;
...@@ -357,12 +373,15 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -357,12 +373,15 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
} }
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
.uid = tableId.uid, .uid = idInfo->uid,
.sid = tableId.tid, .sid = idInfo->sid,
.size = rows, .size = rows,
.window = {.skey = skey, .ekey = ekey} .window = {.skey = skey, .ekey = ekey}
}; };
// update the last key value
pHandle->pTableQueryInfo->lastKey = ekey + 1;
return blockInfo; return blockInfo;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册