未验证 提交 c2d1c480 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #11403 from taosdata/feature/filter_tag

enh: add tag filter
......@@ -260,6 +260,7 @@ typedef struct {
typedef struct SSchema {
int8_t type;
int8_t index; // default is 0, not index created
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
......
......@@ -19,8 +19,8 @@
extern "C" {
#endif
#include "tcommon.h"
#include "nodes.h"
#include "tcommon.h"
typedef struct SFilterInfo SFilterInfo;
typedef int32_t (*filer_get_col_from_id)(void *, int32_t, void **);
......@@ -31,18 +31,18 @@ enum {
FLT_OPTION_NEED_UNIQE = 4,
};
typedef struct SFilterColumnParam{
typedef struct SFilterColumnParam {
int32_t numOfCols;
SArray* pDataBlock;
SArray *pDataBlock;
} SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
......
......@@ -308,6 +308,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].index);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
......@@ -378,6 +379,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].index));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
......@@ -1989,7 +1991,7 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray);
}
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2002,7 +2004,7 @@ int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
return tlen;
}
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
int32_t tDeserializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2014,7 +2016,7 @@ int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
return 0;
}
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2045,7 +2047,7 @@ int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
return tlen;
}
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2076,7 +2078,7 @@ int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
return 0;
}
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
int32_t tSerializeSUserIndexReq(void *buf, int32_t bufLen, SUserIndexReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2089,7 +2091,7 @@ int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq)
return tlen;
}
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
int32_t tDeserializeSUserIndexReq(void *buf, int32_t bufLen, SUserIndexReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2101,7 +2103,7 @@ int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq
return 0;
}
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp) {
int32_t tSerializeSUserIndexRsp(void *buf, int32_t bufLen, const SUserIndexRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2118,7 +2120,7 @@ int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp*
return tlen;
}
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp) {
int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2134,7 +2136,6 @@ int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
......@@ -55,6 +55,8 @@ target_include_directories(
vnode
PUBLIC "inc"
PRIVATE "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
)
target_link_libraries(
vnode
......@@ -69,6 +71,7 @@ target_link_libraries(
PUBLIC scheduler
PUBLIC tdb
#PUBLIC bdb
#PUBLIC scalar
PUBLIC transport
PUBLIC stream
)
......
......@@ -13,18 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
#include "tdatablock.h"
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "texception.h"
#include "vnodeInt.h"
#include "filter.h"
#include "taosdef.h"
#include "tlosertree.h"
#include "vnodeInt.h"
#include "tmsg.h"
#include "vnodeInt.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
......@@ -73,7 +74,7 @@ typedef struct SLoadCompBlockInfo {
enum {
CHECKINFO_CHOSEN_MEM = 0,
CHECKINFO_CHOSEN_IMEM = 1,
CHECKINFO_CHOSEN_BOTH = 2 //for update=2(merge case)
CHECKINFO_CHOSEN_BOTH = 2 // for update=2(merge case)
};
typedef struct STableCheckInfo {
......@@ -81,16 +82,16 @@ typedef struct STableCheckInfo {
TSKEY lastKey;
SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
uint8_t chosen:2; // indicate which iterator should move forward
bool initBuf:1; // whether to initialize the in-memory skip list iterator or not
int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator
} STableCheckInfo;
typedef struct STableBlockInfo {
SBlock *compBlock;
STableCheckInfo *pTableCheckInfo;
SBlock* compBlock;
STableCheckInfo* pTableCheckInfo;
} STableBlockInfo;
typedef struct SBlockOrderSupporter {
......@@ -126,20 +127,20 @@ typedef struct STsdbReadHandle {
bool loadExternalRow; // load time window external data rows
bool currentLoadExternalRows; // current load external rows
int32_t loadType; // block load type
char *idStr; // query info handle, for debug purpose
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet* pFileGroup;
SFSIter fileIter;
SReadH rhelper;
STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SArray *defaultLoadColumn;// default load column
SArray* defaultLoadColumn; // default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
SArray *prev; // previous row which is before than time window
SArray *next; // next row which is after the query time window
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
} STsdbReadHandle;
......@@ -149,19 +150,22 @@ typedef struct STableGroupSupporter {
SSchema* pTagSchema;
} STableGroupSupporter;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList);
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo);
static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList);
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbReadHandle* pTsdbReadHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
//static void* doFreeColumnInfoData(SArray* pColumnInfoData);
//static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void* doFreeColumnInfoData(SArray* pColumnInfoData);
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
......@@ -204,30 +208,32 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
}
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
int64_t rows = 0;
STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) { return rows; }
STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) {
return rows;
}
// STableData* pMem = NULL;
// STableData* pIMem = NULL;
// STableData* pMem = NULL;
// STableData* pIMem = NULL;
// SMemTable* pMemT = pMemRef->snapshot.mem;
// SMemTable* pIMemT = pMemRef->snapshot.imem;
// SMemTable* pMemT = pMemRef->snapshot.mem;
// SMemTable* pIMemT = pMemRef->snapshot.imem;
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
}
return rows;
}
......@@ -244,15 +250,15 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
// todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);
SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
info.lastKey = pTsdbReadHandle->window.skey;
......@@ -264,7 +270,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
}
taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
info.lastKey, pTsdbReadHandle->idStr);
}
}
......@@ -279,7 +286,7 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
// todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
......@@ -297,7 +304,7 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
STableCheckInfo info = { .lastKey = skey};
STableCheckInfo info = {.lastKey = skey};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pNew, &info);
......@@ -365,12 +372,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle->locateStart = false;
pReadHandle->loadType = pCond->type;
pReadHandle->outputCapacity = 4096;//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
pReadHandle->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
pReadHandle->loadExternalRow = pCond->loadExternalRows;
pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;
char buf[128] = {0};
snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
pReadHandle->idStr = strdup(buf);
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
......@@ -421,37 +428,39 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return (tsdbReaderT)pReadHandle;
_end:
_end:
tsdbCleanupReadHandle(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
}
if (emptyQueryTimewindow(pTsdbReadHandle)) {
return (tsdbReaderT*) pTsdbReadHandle;
return (tsdbReaderT*)pTsdbReadHandle;
}
// todo apply the lastkey of table check to avoid to load header file
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle);
// tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo),
taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr);
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
pTsdbReadHandle->idStr);
return (tsdbReaderT) pTsdbReadHandle;
return (tsdbReaderT)pTsdbReadHandle;
}
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) {
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
STsdbReadHandle* pTsdbReadHandle = queryHandle;
if (emptyQueryTimewindow(pTsdbReadHandle)) {
......@@ -488,7 +497,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) {
resetCheckInfo(pTsdbReadHandle);
}
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order;
......@@ -514,21 +523,23 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pC
tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
SArray* pTable = NULL;
// STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
// STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
// pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
// pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
pTsdbReadHandle->pTableCheckInfo = NULL; // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
// &pTable);
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle);
// tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
}
// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
}
tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table
......@@ -536,7 +547,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
return NULL;
}
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
}
......@@ -576,10 +587,10 @@ tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupIn
}
#endif
SArray* tsdbGetQueriedTableList(tsdbReaderT *pHandle) {
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
assert(pHandle != NULL);
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, POINTER_BYTES);
......@@ -594,18 +605,18 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
for(int32_t i = 0; i < numOfGroup; ++i) {
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
size_t numOfTables = taosArrayGetSize(oneGroup);
SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
for (int32_t j = 0; j < numOfTables; ++j) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
// if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
// taosArrayPush(px, pInfo);
// pNew->numOfTables += 1;
// break;
// }
// if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
// taosArrayPush(px, pInfo);
// pNew->numOfTables += 1;
// break;
// }
}
// there are no data in this group
......@@ -619,7 +630,8 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
return pNew;
}
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
if (pNew->numOfTables == 0) {
......@@ -634,7 +646,7 @@ tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, ST
}
}
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
pTsdbReadHandle->loadExternalRow = true;
pTsdbReadHandle->currentLoadExternalRows = true;
......@@ -687,8 +699,9 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr);
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey,
(*pMem)->nrows, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
......@@ -697,7 +710,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
}
} else {
tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
}
if (!imemEmpty) {
......@@ -707,8 +720,9 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr);
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey,
(*pIMem)->nrows, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
......@@ -716,7 +730,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
assert(pCheckInfo->lastKey >= key);
}
} else {
tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
}
return true;
......@@ -761,30 +775,20 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
TSKEY r2 = TD_ROW_KEY(rimem);
if (r1 == r2) {
#if 0
if(update == TD_ROW_DISCARD_UPDATE){
if (update == TD_ROW_DISCARD_UPDATE) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
}
else if(update == TD_ROW_OVERWRITE_UPDATE) {
} else if (update == TD_ROW_OVERWRITE_UPDATE) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
tSkipListIterNext(pCheckInfo->iiter);
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
}
#endif
if (TD_SUPPORT_UPDATE(update)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
}
return r1;
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return r1;
}
else {
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return r2;
}
......@@ -828,7 +832,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return rimem;
} else if(update == TD_ROW_OVERWRITE_UPDATE){
} else if (update == TD_ROW_OVERWRITE_UPDATE) {
tSkipListIterNext(pCheckInfo->iiter);
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return rmem;
......@@ -872,7 +876,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
} else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){
} else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
}
......@@ -897,7 +901,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
}
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
STsdbCfg *pCfg = &pHandle->pTsdb->config;
STsdbCfg* pCfg = &pHandle->pTsdb->config;
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
pHandle->cur.fid = INT32_MIN;
......@@ -913,7 +917,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
}
pCheckInfo->lastKey = TD_ROW_KEY(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
// all data in mem are checked already.
......@@ -922,7 +926,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
......@@ -987,7 +991,7 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot;
}
static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
......@@ -1030,9 +1034,11 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
} else {
assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
}
s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
......@@ -1093,10 +1099,11 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
return code;
}
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
int32_t slotIndex) {
int64_t st = taosGetTimestampUs();
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
......@@ -1120,7 +1127,8 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
......@@ -1139,9 +1147,9 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
pBlock->numOfRows = pCols->numOfRows;
// Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
if(pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
int64_t* src = pCols->cols[0].pData;
for(int32_t i = 0; i < pBlock->numOfRows; ++i) {
for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
src[i] = tdGetKey(src[i]);
}
}
......@@ -1149,25 +1157,29 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int64_t elapsedTime = (taosGetTimestampUs() - st);
pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr);
tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
" us, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
pTsdbReadHandle->idStr);
return TSDB_CODE_SUCCESS;
_error:
pBlock->numOfRows = 0;
tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s",
tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
return terrno;
}
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos);
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
......@@ -1180,22 +1192,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
if (key != TSKEY_INITIAL_VAL) {
tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
} else {
tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
}
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step);
cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
TSKEY maxKey =
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
cur->rows =
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
pTsdbReadHandle->realNumOfRows = cur->rows;
// update the last key value
......@@ -1209,7 +1221,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
return code;
}
// return error, add test cases
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return code;
......@@ -1226,7 +1237,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
pTsdbReadHandle->realNumOfRows = binfo.rows;
......@@ -1249,19 +1260,21 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
assert(cur->blockCompleted);
if (cur->rows == binfo.rows) {
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s",
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
} else {
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr);
tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
pTsdbReadHandle->idStr);
}
}
return code;
}
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
bool* exists) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
......@@ -1287,10 +1300,10 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
cur->pos = asc? 0:(pBlock->numOfRows - 1);
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
} else { // desc order, query ended in current block
if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
......@@ -1299,7 +1312,8 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
cur->pos =
binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
......@@ -1307,7 +1321,7 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
} else {
cur->pos = asc? 0:(pBlock->numOfRows-1);
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
}
}
......@@ -1378,8 +1392,9 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
return midPos;
}
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
......@@ -1393,9 +1408,9 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block
// data in buffer has greater timestamp, copy data in file block
int32_t i = 0, j = 0;
while(i < requiredNumOfCols && j < pCols->numOfCols) {
while (i < requiredNumOfCols && j < pCols->numOfCols) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
SDataCol* src = &pCols->cols[j];
......@@ -1406,8 +1421,8 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
for(int32_t k = start; k < num + start; ++k) {
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
for (int32_t k = start; k < num + start; ++k) {
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0);
......@@ -1423,7 +1438,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
// todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k) {
SCellVal sVal = {0};
if(tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0){
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0);
}
......@@ -1438,7 +1453,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j++;
i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data
for(int32_t k = start; k < num + start; ++k) { // TODO opt performance
for (int32_t k = start; k < num + start; ++k) { // TODO opt performance
colDataAppend(pColInfo, k, NULL, true);
}
i++;
......@@ -1447,7 +1462,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
for(int32_t k = start; k < num + start; ++k) {
for (int32_t k = start; k < num + start; ++k) {
colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
}
i++;
......@@ -1483,19 +1498,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1));
}
if(isRow1DataRow) {
if (isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
} else {
numOfColsOfRow1 = tdRowGetNCols(row1);
}
int32_t numOfColsOfRow2 = 0;
if(row2) {
if (row2) {
isRow2DataRow = TD_IS_TP_ROW(row2);
if (pSchema2 == NULL) {
pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2));
}
if(isRow2DataRow) {
if (isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
} else {
numOfColsOfRow2 = tdRowGetNCols(row2);
......@@ -1503,31 +1518,31 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
}
int32_t i = 0, j = 0, k = 0;
while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
int32_t colIdOfRow1;
if(j >= numOfColsOfRow1) {
if (j >= numOfColsOfRow1) {
colIdOfRow1 = INT32_MAX;
} else if(isRow1DataRow) {
} else if (isRow1DataRow) {
colIdOfRow1 = pSchema1->columns[j].colId;
} else {
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row1, j);
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j);
colIdOfRow1 = pColIdx->colId;
}
int32_t colIdOfRow2;
if(k >= numOfColsOfRow2) {
if (k >= numOfColsOfRow2) {
colIdOfRow2 = INT32_MAX;
} else if(isRow2DataRow) {
} else if (isRow2DataRow) {
colIdOfRow2 = pSchema2->columns[k].colId;
} else {
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row2, k);
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k);
colIdOfRow2 = pColIdx->colId;
}
if(colIdOfRow1 == colIdOfRow2) {
if(colIdOfRow1 < pColInfo->info.colId) {
if (colIdOfRow1 == colIdOfRow2) {
if (colIdOfRow1 < pColInfo->info.colId) {
j++;
k++;
continue;
......@@ -1536,8 +1551,8 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
pSchema = pSchema1;
isChosenRowDataRow = isRow1DataRow;
chosen_itr = j;
} else if(colIdOfRow1 < colIdOfRow2) {
if(colIdOfRow1 < pColInfo->info.colId) {
} else if (colIdOfRow1 < colIdOfRow2) {
if (colIdOfRow1 < pColInfo->info.colId) {
j++;
continue;
}
......@@ -1546,7 +1561,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
isChosenRowDataRow = isRow1DataRow;
chosen_itr = j;
} else {
if(colIdOfRow2 < pColInfo->info.colId) {
if (colIdOfRow2 < pColInfo->info.colId) {
k++;
continue;
}
......@@ -1555,12 +1570,12 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
chosen_itr = k;
isChosenRowDataRow = isRow2DataRow;
}
if(isChosenRowDataRow) {
if (isChosenRowDataRow) {
colId = pSchema->columns[chosen_itr].colId;
offset = pSchema->columns[chosen_itr].offset;
tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal);
} else {
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row, chosen_itr);
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr);
colId = pColIdx->colId;
offset = pColIdx->offset;
tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal);
......@@ -1575,20 +1590,20 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
i++;
if(row == row1) {
if (row == row1) {
j++;
} else {
k++;
}
} else {
if(forceSetNull) {
if (forceSetNull) {
colDataAppend(pColInfo, numOfRows, NULL, true);
}
i++;
}
}
if(forceSetNull) {
if (forceSetNull) {
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
colDataAppend(pColInfo, numOfRows, NULL, true);
......@@ -1606,15 +1621,16 @@ static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows,
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pTsdbReadHandle->outputCapacity) {
int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
}
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
int32_t* start, int32_t* end) {
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
int32_t numOfExisted, int32_t* start, int32_t* end) {
*start = -1;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
......@@ -1639,7 +1655,8 @@ static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startP
}
}
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
int32_t endPos) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
pCheckInfo->lastKey = cur->lastKey;
......@@ -1659,22 +1676,24 @@ static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
}
SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
} else {
cur->win = pTsdbReadHandle->window;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
cur->lastKey = pTsdbReadHandle->window.ekey + step;
}
}
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
int32_t pos = cur->pos;
......@@ -1690,7 +1709,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
// the time window should always be ascending order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
cur->mixBlock = (numOfRows != pBlockInfo->rows);
cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = true;
......@@ -1703,14 +1722,15 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle);
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
pTsdbReadHandle->idStr);
}
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
......@@ -1734,7 +1754,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p
// be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataBlockInfo blockInfo = {0};//GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
SDataBlockInfo blockInfo = {0}; // GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
initTableMemIterator(pTsdbReadHandle, pCheckInfo);
......@@ -1744,21 +1764,23 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d,"
"end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey,
blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr);
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0;
......@@ -1790,23 +1812,26 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
break;
}
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
break;
}
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = TD_ROW_SVER(row1);
}
if(row2 && rv2 != TD_ROW_SVER(row2)) {
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
if (row2 && rv2 != TD_ROW_SVER(row2)) {
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, true);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1819,20 +1844,21 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
if (pCfg->update) {
if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
}
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = TD_ROW_SVER(row1);
}
if(row2 && rv2 != TD_ROW_SVER(row2)) {
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
if (row2 && rv2 != TD_ROW_SVER(row2)) {
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1870,7 +1896,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[qend]:tsArray[qstart];
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pTsdbReadHandle->outputCapacity);
......@@ -1896,7 +1922,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[end]:tsArray[start];
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start];
cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true;
}
......@@ -1915,8 +1941,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle);
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
pTsdbReadHandle->idStr);
}
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
......@@ -2032,7 +2059,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
}
memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
......@@ -2141,11 +2168,11 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
while(1) {
while (1) {
int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
......@@ -2223,7 +2250,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
}
// todo return error code to query engine
if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) {
if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
TSDB_CODE_SUCCESS) {
break;
}
......@@ -2245,7 +2273,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
}
assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 0:pTsdbReadHandle->numOfBlocks-1;
cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
cur->fid = pTsdbReadHandle->pFileGroup->fid;
STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
......@@ -2258,7 +2286,7 @@ static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool asc
}
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
......@@ -2269,7 +2297,7 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
}
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0;
......@@ -2291,7 +2319,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
int defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
int defaultRows = 4096; // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
STimeWindow win = TSWINDOW_INITIALIZER;
bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
......@@ -2308,7 +2336,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
(!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
......@@ -2361,9 +2390,9 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
if (numOfRows < defaultRows) {
pTableBlockInfo->numOfSmallBlocks += 1;
}
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
// blockInfo->numBlocksOfStep++;
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
// blockInfo->numBlocksOfStep++;
}
}
}
......@@ -2432,23 +2461,23 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
return false;
}
//todo not unref yet, since it is not support multi-group interpolation query
// todo not unref yet, since it is not support multi-group interpolation query
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
// filter the queried time stamp in the first place
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
// starts from the buffer in case of descending timestamp order check data blocks
size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
int32_t i = 0;
while(i < numOfTables) {
while (i < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// the first qualified table for interpolation query
// if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
// (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
// break;
// }
// if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
// (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
// break;
// }
i++;
}
......@@ -2458,7 +2487,7 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
return;
}
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
info.lastKey = pTsdbReadHandle->window.skey;
......@@ -2483,9 +2512,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
TSKEY key = TD_ROW_KEY(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey,
pTsdbReadHandle->window.ekey);
if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
break;
}
......@@ -2499,14 +2529,15 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
rv = TD_ROW_SVER(row);
}
mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true);
mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
NULL, true);
if (++numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo);
break;
}
} while(moveToNextRowInMem(pCheckInfo));
} while (moveToNextRowInMem(pCheckInfo));
assert(numOfRows <= maxRowsToRead);
......@@ -2514,15 +2545,16 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
int32_t emptySize = maxRowsToRead - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle,
elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
return numOfRows;
}
......@@ -2549,12 +2581,12 @@ static void destroyHelper(void* param) {
return;
}
// tQueryInfo* pInfo = (tQueryInfo*)param;
// if (pInfo->optr != TSDB_RELATION_IN) {
// taosMemoryFreeClear(pInfo->q);
// } else {
// taosHashCleanup((SHashObj *)(pInfo->q));
// }
// tQueryInfo* pInfo = (tQueryInfo*)param;
// if (pInfo->optr != TSDB_RELATION_IN) {
// taosMemoryFreeClear(pInfo->q);
// } else {
// taosHashCleanup((SHashObj *)(pInfo->q));
// }
taosMemoryFree(param);
}
......@@ -2574,7 +2606,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
}
if (exists) {
tsdbRetrieveDataBlock((tsdbReaderT*) pTsdbReadHandle, NULL);
tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
......@@ -2593,16 +2625,17 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
}
// current result is empty
if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) {
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->cur.rows == 0) {
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
// doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
// doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
// doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
// doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
bool result = tsdbGetExternalRow(pTsdbReadHandle);
// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
pTsdbReadHandle->currentLoadExternalRows = false;
return result;
......@@ -2622,15 +2655,16 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
STSRow* pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
if (++pTsdbReadHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
// int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId,
NULL, NULL, true);
taosMemoryFreeClear(pRow);
// update the last key value
......@@ -2648,9 +2682,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
return false;
}
//static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
// // the last row is cached in buffer, return it directly.
// // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
// int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
......@@ -2670,8 +2702,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// int32_t numOfCols = pTable->maxColNum;
//
// if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
// tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId);
// continue;
// tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
// pTable->tableId); continue;
// }
//
// int32_t i = 0, j = 0;
......@@ -2806,7 +2838,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
int64_t stime = taosGetTimestampUs();
while(pTsdbReadHandle->activeIndex < numOfTables) {
while (pTsdbReadHandle->activeIndex < numOfTables) {
if (loadBlockOfActiveTable(pTsdbReadHandle)) {
return true;
}
......@@ -2831,15 +2863,16 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
// handle data in cache situation
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
for(int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
}
if (emptyQueryTimewindow(pTsdbReadHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
pTsdbReadHandle->idStr);
return false;
}
......@@ -2849,9 +2882,9 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
// TODO refactor: remove "type"
if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
// return loadCachedLastRow(pTsdbReadHandle);
// return loadCachedLastRow(pTsdbReadHandle);
} else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
// return loadCachedLast(pTsdbReadHandle);
// return loadCachedLast(pTsdbReadHandle);
}
}
......@@ -2889,7 +2922,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
}
}
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
// STsdbReadHandle* pSecQueryHandle = NULL;
//
// if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
......@@ -2994,13 +3027,13 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
// memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
// }
//
//out_of_memory:
// out_of_memory:
// tsdbCleanupReadHandle(pSecQueryHandle);
// return terrno;
//}
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
cur->fid = INT32_MIN;
......@@ -3010,7 +3043,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
return false;
}
int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
......@@ -3057,36 +3090,36 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
//}
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
}
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) {
assert(pTsdbReadHandle != NULL && groupList != NULL);
// TSKEY key = TSKEY_INITIAL_VAL;
//
// SArray* group = taosArrayGetP(groupList->pGroupList, 0);
// assert(group != NULL);
//
// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
//
// int32_t code = 0;
//
// if (((STable*)pInfo->pTable)->lastRow) {
// code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
// if (code != TSDB_CODE_SUCCESS) {
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
// } else {
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
// }
// }
//
// // update the tsdb query time range
// if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
// pTsdbReadHandle->window = TSWINDOW_INITIALIZER;
// pTsdbReadHandle->checkFiles = false;
// pTsdbReadHandle->activeIndex = -1; // start from -1
// }
// TSKEY key = TSKEY_INITIAL_VAL;
//
// SArray* group = taosArrayGetP(groupList->pGroupList, 0);
// assert(group != NULL);
//
// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
//
// int32_t code = 0;
//
// if (((STable*)pInfo->pTable)->lastRow) {
// code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
// if (code != TSDB_CODE_SUCCESS) {
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
// } else {
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
// }
// }
//
// // update the tsdb query time range
// if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
// pTsdbReadHandle->window = TSWINDOW_INITIALIZER;
// pTsdbReadHandle->checkFiles = false;
// pTsdbReadHandle->activeIndex = -1; // start from -1
// }
return TSDB_CODE_SUCCESS;
}
......@@ -3095,9 +3128,9 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
assert(pTsdbReadHandle != NULL);
int32_t code = 0;
// if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
// }
// if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
// pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
// }
// update the tsdb query time range
if (pTsdbReadHandle->cachelastrow) {
......@@ -3108,8 +3141,7 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
return code;
}
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0;
......@@ -3117,22 +3149,22 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
// NOTE: starts from the buffer in case of descending timestamp order check data blocks
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
for(int32_t j = 0; j < numOfGroups; ++j) {
for (int32_t j = 0; j < numOfGroups; ++j) {
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
TSKEY key = TSKEY_INITIAL_VAL;
STableKeyInfo keyInfo = {0};
size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = 0;//((STable*)(pInfo->pTable))->lastKey;
TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) {
key = lastKey;
// keyInfo.pTable = pInfo->pTable;
// keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key;
pInfo->lastKey = key;
......@@ -3147,18 +3179,18 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
}
// more than one table in each group, only one table left for each group
// if (keyInfo.pTable != NULL) {
// totalNumOfTable++;
// if (taosArrayGetSize(pGroup) == 1) {
// // do nothing
// } else {
// taosArrayClear(pGroup);
// taosArrayPush(pGroup, &keyInfo);
// }
// } else { // mark all the empty groups, and remove it later
// taosArrayDestroy(pGroup);
// taosArrayPush(emptyGroup, &j);
// }
// if (keyInfo.pTable != NULL) {
// totalNumOfTable++;
// if (taosArrayGetSize(pGroup) == 1) {
// // do nothing
// } else {
// taosArrayClear(pGroup);
// taosArrayPush(pGroup, &keyInfo);
// }
// } else { // mark all the empty groups, and remove it later
// taosArrayDestroy(pGroup);
// taosArrayPush(emptyGroup, &j);
// }
}
// window does not being updated, so set the original
......@@ -3167,7 +3199,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
}
taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t) taosArrayGetSize(emptyGroup));
taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
taosArrayDestroy(emptyGroup);
groupList->numOfTables = totalNumOfTable;
......@@ -3189,10 +3221,11 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
uid = pCheckInfo->tableId;
}
tsdbDebug("data block generated, uid:%"PRIu64" numOfRows:%d, tsrange:%"PRId64" - %"PRId64" %s", uid, cur->rows, cur->win.skey,
cur->win.ekey, pHandle->idStr);
tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
cur->win.skey, cur->win.ekey, pHandle->idStr);
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign the table uid
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign
// the table uid
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
......@@ -3202,7 +3235,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle;
STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
SQueryFilePos* c = &pHandle->cur;
if (c->mixBlock) {
......@@ -3232,7 +3265,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
for(int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
pHandle->statis[i].colId = colIds[i];
}
......@@ -3246,8 +3279,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
//update the number of NULL data rows
for(int32_t i = 1; i < numOfCols; ++i) {
// update the number of NULL data rows
for (int32_t i = 1; i < numOfCols; ++i) {
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
......@@ -3298,9 +3331,10 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
int32_t emptySize = pHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
for(int32_t i = 0; i < reqNumOfCols; ++i) {
for (int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
......@@ -3356,7 +3390,7 @@ void filterPrepare(void* expr, void* param) {
#endif
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
static int32_t tableGroupComparFn(const void* p1, const void* p2, const void* param) {
#if 0
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = ((STableKeyInfo*) p1)->uid;
......@@ -3453,7 +3487,8 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
TSKEY skey) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
......@@ -3485,7 +3520,7 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
return pTableGroup;
}
//static bool tableFilterFp(const void* pNode, void* param) {
// static bool tableFilterFp(const void* pNode, void* param) {
// tQueryInfo* pInfo = (tQueryInfo*) param;
//
// STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
......@@ -3570,12 +3605,13 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
// return true;
//}
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
// static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp
// *param);
//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// // query according to the expression tree
// static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// // // query according to the expression tree
// SExprTraverseSupp supp = {
// .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
// .nodeFilterFn = (__result_filter_fn_t)tableFilterFp,
// .setupInfoFn = filterPrepare,
// .pExtInfo = pSTable->tagSchema,
// };
......@@ -3590,18 +3626,19 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg == NULL) {
tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error;
}
if (pTbCfg->type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
goto _error;
}
//NOTE: not add ref count for super table
// NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
......@@ -3612,66 +3649,44 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
goto _error;
}
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta,
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64
" QID:0x%" PRIx64,
pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
taosArrayDestroy(res);
return ret;
}
int32_t ret = TSDB_CODE_SUCCESS;
// tExprNode* expr = NULL;
//
// TRY(TSDB_MAX_TAG_CONDITIONS) {
// expr = exprTreeFromTableName(tbnameCond);
// if (expr == NULL) {
// expr = exprTreeFromBinary(pTagCond, len);
// } else {
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
// tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
// if (tagExpr != NULL) {
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
// tExprNode* tbnameExpr = expr;
// expr = taosMemoryCalloc(1, sizeof(tExprNode));
// if (expr == NULL) {
// THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
// }
// expr->nodeType = TSQL_NODE_EXPR;
// expr->_node.optr = (uint8_t)tagNameRelType;
// expr->_node.pLeft = tagExpr;
// expr->_node.pRight = tbnameExpr;
// }
// }
// CLEANUP_EXECUTE();
//
// } CATCH( code ) {
// CLEANUP_EXECUTE();
// terrno = code;
// tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases
//
// goto _error;
// // TODO: more error handling
// } END_TRY
//
// doQueryTableList(pTable, res, expr);
// pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
// pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
// tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
// pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
// taosArrayDestroy(res);
//
// if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
// return ret;
_error:
SFilterInfo* filterInfo = NULL;
ret = filterInitFromNode((SNode*)pTagCond, &filterInfo, 0);
if (ret != TSDB_CODE_SUCCESS) {
terrno = ret;
return ret;
}
ret = tsdbQueryTableList(pMeta, res, filterInfo);
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
// tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb,
// pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res);
return ret;
_error:
return terrno;
}
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
// impl later
return TSDB_CODE_SUCCESS;
}
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg == NULL) {
......@@ -3690,7 +3705,7 @@ int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGr
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
_error:
_error:
return terrno;
}
......@@ -3769,7 +3784,6 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
return NULL;
}
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) {
......@@ -3783,7 +3797,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
taosMemoryFreeClear(pTsdbReadHandle->statis);
if (!emptyQueryTimewindow(pTsdbReadHandle)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
} else {
assert(pTsdbReadHandle->pTableCheckInfo == NULL);
}
......@@ -3802,8 +3816,10 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
SIOCostSummary* pCost = &pTsdbReadHandle->cost;
tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s",
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
taosMemoryFreeClear(pTsdbReadHandle);
}
......@@ -4057,37 +4073,37 @@ static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, S
}
// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
return;
}
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
param->setupInfoFn(pExpr, param->pExtInfo);
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
&& pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
&& pQueryInfo->optr != TSDB_RELATION_IN)) {
queryIndexedColumn(pSkipList, pQueryInfo, result);
} else {
queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
}
return;
}
// The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
//void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
// if (pExpr == NULL) {
// return;
// }
//
// tExprNode *pLeft = pExpr->_node.pLeft;
// tExprNode *pRight = pExpr->_node.pRight;
//
// // column project
// if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
// assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
//
// param->setupInfoFn(pExpr, param->pExtInfo);
//
// tQueryInfo *pQueryInfo = pExpr->_node.info;
// if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
// && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
// && pQueryInfo->optr != TSDB_RELATION_IN)) {
// queryIndexedColumn(pSkipList, pQueryInfo, result);
// } else {
// queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
// }
//
// return;
// }
//
// // The value of hasPK is always 0.
// uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
// assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//
// //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
// applyFilterToSkipListNode(pSkipList, pExpr, result, param);
//}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册