提交 ae65941c 编写于 作者: H Haojun Liao

[td-225] add check

上级 431945fa
...@@ -198,38 +198,38 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -198,38 +198,38 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols; int32_t numOfCols = pCond->numOfCols;
pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i]; colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &colInfo); taosArrayPush(pQueryHandle->pColumns, &colInfo);
pQueryHandle->statis[i].colId = colInfo.info.colId; pQueryHandle->statis[i].colId = colInfo.info.colId;
} }
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(tsdb); STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL); assert(pMeta != NULL);
for (int32_t i = 0; i < sizeOfGroup; ++i) { for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group); size_t gsize = taosArrayGetSize(group);
assert(gsize > 0); assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) { for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = (STable*) taosArrayGetP(group, j); STable* pTable = (STable*) taosArrayGetP(group, j);
STableCheckInfo info = { STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey, .lastKey = pQueryHandle->window.skey,
.tableId = pTable->tableId, .tableId = pTable->tableId,
.pTableObj = pTable, .pTableObj = pTable,
}; };
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
...@@ -259,17 +259,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab ...@@ -259,17 +259,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
assert(pHandle != NULL); assert(pHandle != NULL);
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle; STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, POINTER_BYTES); SArray* res = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayPush(res, &pCheckInfo->pTableObj); taosArrayPush(res, &pCheckInfo->pTableObj);
} }
return res; return res;
} }
...@@ -285,11 +285,11 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond ...@@ -285,11 +285,11 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond
static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL); assert(pTable != NULL);
if (pCheckInfo->initBuf) { if (pCheckInfo->initBuf) {
return true; return true;
} }
pCheckInfo->initBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
...@@ -297,34 +297,34 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -297,34 +297,34 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
if (pHandle->mem == NULL && pHandle->imem == NULL) { if (pHandle->mem == NULL && pHandle->imem == NULL) {
return false; return false;
} }
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
// both iterators are NULL, no data in buffer right now // both iterators are NULL, no data in buffer right now
if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) { if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
return false; return false;
} }
bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter)); bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
if (memEmpty && imemEmpty) { // buffer is empty if (memEmpty && imemEmpty) { // buffer is empty
return false; return false;
} }
if (!memEmpty) { if (!memEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL); assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
...@@ -333,11 +333,11 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -333,11 +333,11 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo); pHandle->qinfo);
} }
if (!imemEmpty) { if (!imemEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL); assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
...@@ -346,7 +346,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -346,7 +346,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo); pHandle->qinfo);
} }
return true; return true;
} }
...@@ -449,7 +449,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -449,7 +449,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
pHandle->cur.fid = -1; pHandle->cur.fid = -1;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
...@@ -467,17 +467,17 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -467,17 +467,17 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
// all data in mem are checked already. // all data in mem are checked already.
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
(pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) { (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
return false; return false;
} }
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
STimeWindow* win = &pHandle->cur.win; STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle); pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
// update the last key value // update the last key value
pCheckInfo->lastKey = win->ekey + step; pCheckInfo->lastKey = win->ekey + step;
pHandle->cur.lastKey = win->ekey + step; pHandle->cur.lastKey = win->ekey + step;
...@@ -486,7 +486,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -486,7 +486,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
if (!ASCENDING_TRAVERSE(pHandle->order)) { if (!ASCENDING_TRAVERSE(pHandle->order)) {
SWAP(win->skey, win->ekey, TSKEY); SWAP(win->skey, win->ekey, TSKEY);
} }
return true; return true;
} }
...@@ -495,31 +495,31 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio ...@@ -495,31 +495,31 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
if (key == TSKEY_INITIAL_VAL) { if (key == TSKEY_INITIAL_VAL) {
return INT32_MIN; return INT32_MIN;
} }
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN; fid = INT32_MIN;
} }
if (fid > 0L && fid > INT32_MAX) { if (fid > 0L && fid > INT32_MAX) {
fid = INT32_MAX; fid = INT32_MAX;
} }
return fid; return fid;
} }
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0; int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1; int32_t lastSlot = numOfBlocks - 1;
int32_t midSlot = firstSlot; int32_t midSlot = firstSlot;
while (1) { while (1) {
numOfBlocks = lastSlot - firstSlot + 1; numOfBlocks = lastSlot - firstSlot + 1;
midSlot = (firstSlot + (numOfBlocks >> 1)); midSlot = (firstSlot + (numOfBlocks >> 1));
if (numOfBlocks == 1) break; if (numOfBlocks == 1) break;
if (skey > pBlock[midSlot].keyLast) { if (skey > pBlock[midSlot].keyLast) {
if (numOfBlocks == 2) break; if (numOfBlocks == 2) break;
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break; if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
...@@ -531,7 +531,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK ...@@ -531,7 +531,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
break; // got the slot break; // got the slot
} }
} }
return midSlot; return midSlot;
} }
...@@ -669,10 +669,10 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -669,10 +669,10 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
...@@ -688,12 +688,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -688,12 +688,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY); SWAP(cur->win.skey, cur->win.ekey, TSKEY);
} }
cur->mixBlock = true; cur->mixBlock = true;
cur->blockCompleted = false; cur->blockCompleted = false;
return; return;
} }
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
...@@ -727,14 +727,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -727,14 +727,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) { if (pCheckInfo->lastKey > pBlock->keyFirst) {
cur->pos = cur->pos =
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else { } else {
cur->pos = 0; cur->pos = 0;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
...@@ -744,14 +744,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -744,14 +744,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false; return false;
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) { if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else { } else {
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
...@@ -767,7 +767,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { ...@@ -767,7 +767,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
TSKEY* keyList; TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
if (num <= 0) return -1; if (num <= 0) return -1;
keyList = (TSKEY*)pValue; keyList = (TSKEY*)pValue;
...@@ -826,13 +826,13 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { ...@@ -826,13 +826,13 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL; char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1; int32_t num = end - start + 1;
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block //data in buffer has greater timestamp, copy data in file block
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(i < requiredNumOfCols && j < pCols->numOfCols) { while(i < requiredNumOfCols && j < pCols->numOfCols) {
...@@ -905,7 +905,7 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap ...@@ -905,7 +905,7 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
i++; i++;
} }
pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step; pQueryHandle->cur.lastKey = tsArray[end] + step;
...@@ -1027,7 +1027,7 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo ...@@ -1027,7 +1027,7 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
...@@ -1038,7 +1038,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1038,7 +1038,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
...@@ -1054,12 +1054,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1054,12 +1054,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
cur->mixBlock = true; cur->mixBlock = true;
} }
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = cur->pos;
int32_t numOfRows = 0; int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
// no data in buffer, load data from file directly // no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
...@@ -1069,13 +1068,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1069,13 +1068,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(start, end, int32_t); SWAP(start, end, int32_t);
} }
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
// the time window should always be right order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols); moveDataToFront(pQueryHandle, numOfRows, numOfCols);
...@@ -1133,11 +1135,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1133,11 +1135,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step; pos += (qend - qstart + 1) * step;
cur->win.ekey = tsArray[end]; cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
} }
} while (numOfRows < pQueryHandle->outputCapacity); } while (numOfRows < pQueryHandle->outputCapacity);
if (numOfRows < pQueryHandle->outputCapacity) { if (numOfRows < pQueryHandle->outputCapacity) {
/** /**
* if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
...@@ -1157,14 +1159,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1157,14 +1159,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->win.ekey = tsArray[end]; cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
} }
} }
} }
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || cur->blockCompleted =
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))); (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) { if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY); SWAP(cur->win.skey, cur->win.ekey, TSKEY);
...@@ -1179,6 +1182,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1179,6 +1182,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey); assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
} }
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey, tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo); cur->win.ekey, cur->rows, pQueryHandle->qinfo);
} }
...@@ -1314,16 +1320,16 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1314,16 +1320,16 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
cleanBlockOrderSupporter(&sup, 0); cleanBlockOrderSupporter(&sup, 0);
return TSDB_CODE_TDB_OUT_OF_MEMORY; return TSDB_CODE_TDB_OUT_OF_MEMORY;
} }
int32_t cnt = 0; int32_t cnt = 0;
int32_t numOfQualTables = 0; int32_t numOfQualTables = 0;
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
if (pTableCheck->numOfBlocks <= 0) { if (pTableCheck->numOfBlocks <= 0) {
continue; continue;
} }
SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
...@@ -1428,26 +1434,26 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1428,26 +1434,26 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break; break;
} }
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo); numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0); assert(numOfBlocks >= 0);
if (numOfBlocks == 0) { if (numOfBlocks == 0) {
continue; continue;
} }
// todo return error code to query engine // todo return error code to query engine
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) { if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
break; break;
} }
assert(numOfBlocks >= pQueryHandle->numOfBlocks); assert(numOfBlocks >= pQueryHandle->numOfBlocks);
if (pQueryHandle->numOfBlocks > 0) { if (pQueryHandle->numOfBlocks > 0) {
break; break;
} }
} }
// no data in file anymore // no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) { if (pQueryHandle->numOfBlocks <= 0) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -1458,10 +1464,10 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1458,10 +1464,10 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
*exists = false; *exists = false;
return code; return code;
} }
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
...@@ -1477,7 +1483,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1477,7 +1483,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
pQueryHandle->locateStart = true; pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
...@@ -1486,7 +1492,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1486,7 +1492,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// check if current file block is all consumed // check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// current block is done, try next // current block is done, try next
if (!cur->mixBlock || cur->blockCompleted) { if (!cur->mixBlock || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
...@@ -1497,10 +1503,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1497,10 +1503,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// next block of the same file // next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step; cur->slot += step;
cur->mixBlock = false; cur->mixBlock = false;
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
...@@ -1518,15 +1524,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1518,15 +1524,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables); assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
while (pQueryHandle->activeIndex < numOfTables) { while (pQueryHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pQueryHandle)) { if (hasMoreDataInCache(pQueryHandle)) {
return true; return true;
} }
pQueryHandle->activeIndex += 1; pQueryHandle->activeIndex += 1;
} }
return false; return false;
} }
...@@ -1544,14 +1550,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1544,14 +1550,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->order = TSDB_ORDER_DESC;
if (!tsdbNextDataBlock(pHandle)) { if (!tsdbNextDataBlock(pHandle)) {
return false; return false;
} }
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return // data already retrieve, discard other data rows and return
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
...@@ -1559,7 +1565,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1559,7 +1565,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
} }
pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
pQueryHandle->window = pQueryHandle->cur.win; pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 1; pQueryHandle->cur.rows = 1;
...@@ -1576,7 +1582,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1576,7 +1582,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->checkFiles = true; pSecQueryHandle->checkFiles = true;
pSecQueryHandle->activeIndex = 0; pSecQueryHandle->activeIndex = 0;
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
free(pSecQueryHandle); free(pSecQueryHandle);
return false; return false;
...@@ -1586,24 +1592,24 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1586,24 +1592,24 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
colInfo.info = pCol->info; colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
taosArrayPush(pSecQueryHandle->pColumns, &colInfo); taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
} }
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo)); pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
assert(pMeta != NULL); assert(pMeta != NULL);
for (int32_t j = 0; j < si; ++j) { for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = { STableCheckInfo info = {
...@@ -1611,10 +1617,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1611,10 +1617,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
.tableId = pCheckInfo->tableId, .tableId = pCheckInfo->tableId,
.pTableObj = pCheckInfo->pTableObj, .pTableObj = pCheckInfo->pTableObj,
}; };
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
} }
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
...@@ -1624,17 +1630,17 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1624,17 +1630,17 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes); memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
assert(pCol->info.colId == pCol1->info.colId); assert(pCol->info.colId == pCol1->info.colId);
memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
} }
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
// it is ascending order // it is ascending order
...@@ -1658,7 +1664,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1658,7 +1664,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->checkFiles = false; pQueryHandle->checkFiles = false;
return true; return true;
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
bool exists = true; bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists); int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
...@@ -1671,11 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1671,11 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->cost.checkForNextTime += elapsedTime; pQueryHandle->cost.checkForNextTime += elapsedTime;
return exists; return exists;
} }
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false; pQueryHandle->checkFiles = false;
} }
// TODO: opt by consider the scan order // TODO: opt by consider the scan order
bool ret = doHasDataInBuffer(pQueryHandle); bool ret = doHasDataInBuffer(pQueryHandle);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -1688,15 +1694,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1688,15 +1694,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
assert(!ASCENDING_TRAVERSE(pQueryHandle->order)); assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
// starts from the buffer in case of descending timestamp order check data blocks // starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window // todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
TSKEY key = TSKEY_INITIAL_VAL; TSKEY key = TSKEY_INITIAL_VAL;
int32_t index = -1; int32_t index = -1;
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pCheckInfo->pTableObj->lastKey > key) { if (pCheckInfo->pTableObj->lastKey > key) {
...@@ -1704,36 +1710,36 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { ...@@ -1704,36 +1710,36 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
index = i; index = i;
} }
} }
if (index == -1) { if (index == -1) {
// todo add failure test cases // todo add failure test cases
return; return;
} }
// erase all other elements in array list // erase all other elements in array list
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
if (i == index) { if (i == index) {
continue; continue;
} }
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter); tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) { if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf); tfree(pTableCheckInfo->pDataCols->buf);
} }
tfree(pTableCheckInfo->pDataCols); tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo); tfree(pTableCheckInfo->pCompInfo);
} }
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index); STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
taosArrayClear(pQueryHandle->pTableCheckInfo); taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = key; info.lastKey = key;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp // update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {key, key}; pQueryHandle->window = (STimeWindow) {key, key};
} }
...@@ -1742,13 +1748,13 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { ...@@ -1742,13 +1748,13 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
// filter the queried time stamp in the first place // filter the queried time stamp in the first place
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->order = TSDB_ORDER_DESC;
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey); assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
// starts from the buffer in case of descending timestamp order check data blocks // starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window // todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t i = 0; int32_t i = 0;
while(i < numOfTables) { while(i < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
...@@ -1756,21 +1762,21 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { ...@@ -1756,21 +1762,21 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) { pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
break; break;
} }
i++; i++;
} }
// there are no data in all the tables // there are no data in all the tables
if (i == numOfTables) { if (i == numOfTables) {
return; return;
} }
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayClear(pQueryHandle->pTableCheckInfo); taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = pQueryHandle->window.skey; info.lastKey = pQueryHandle->window.skey;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp // update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL}; pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
} }
...@@ -1794,7 +1800,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1794,7 +1800,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey); pQueryHandle->window.ekey);
break; break;
} }
...@@ -1809,21 +1815,21 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1809,21 +1815,21 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
break; break;
} }
} while(moveToNextRowInMem(pCheckInfo)); } while(moveToNextRowInMem(pCheckInfo));
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
int32_t emptySize = maxRowsToRead - numOfRows; int32_t emptySize = maxRowsToRead - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
} }
} }
int64_t elapsedTime = taosGetTimestampUs() - st; int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle, tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols); elapsedTime, numOfRows, numOfCols);
...@@ -1835,7 +1841,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p ...@@ -1835,7 +1841,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
SQueryFilePos* cur = &pHandle->cur; SQueryFilePos* cur = &pHandle->cur;
STable* pTable = NULL; STable* pTable = NULL;
// there are data in file // there are data in file
if (pHandle->cur.fid >= 0) { if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
...@@ -1857,13 +1863,13 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p ...@@ -1857,13 +1863,13 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
*/ */
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
SQueryFilePos* c = &pHandle->cur; SQueryFilePos* c = &pHandle->cur;
if (c->mixBlock) { if (c->mixBlock) {
*pBlockStatis = NULL; *pBlockStatis = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0))); assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));
...@@ -1883,7 +1889,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1883,7 +1889,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
pHandle->statis[i].colId = colIds[i]; pHandle->statis[i].colId = colIds[i];
} }
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
// always load the first primary timestamp column data // always load the first primary timestamp column data
...@@ -1932,31 +1938,31 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1932,31 +1938,31 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
} else { } else {
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows); assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method // data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid && if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns; return pHandle->pColumns;
} else { // only load the file block } else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock; SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
// todo refactor // todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
int32_t emptySize = pHandle->outputCapacity - numOfRows; int32_t emptySize = pHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns); int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);
for(int32_t i = 0; i < reqNumOfCols; ++i) { for(int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
} }
} }
return pHandle->pColumns; return pHandle->pColumns;
} }
} }
...@@ -1967,11 +1973,11 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) { ...@@ -1967,11 +1973,11 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex); SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, pTable); taosArrayPush(list, pTable);
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1981,12 +1987,12 @@ static void destroyHelper(void* param) { ...@@ -1981,12 +1987,12 @@ static void destroyHelper(void* param) {
return; return;
} }
tQueryInfo* pInfo = (tQueryInfo*)param; tQueryInfo* pInfo = (tQueryInfo*)param;
if (pInfo->optr != TSDB_RELATION_IN) { if (pInfo->optr != TSDB_RELATION_IN) {
tfree(pInfo->q); tfree(pInfo->q);
} }
// tVariantDestroy(&(pInfo->q)); // tVariantDestroy(&(pInfo->q));
free(param); free(param);
} }
...@@ -1998,7 +2004,7 @@ void filterPrepare(void* expr, void* param) { ...@@ -1998,7 +2004,7 @@ void filterPrepare(void* expr, void* param) {
} }
pExpr->_node.info = calloc(1, sizeof(tQueryInfo)); pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
STSchema* pTSSchema = (STSchema*) param; STSchema* pTSSchema = (STSchema*) param;
tQueryInfo* pInfo = pExpr->_node.info; tQueryInfo* pInfo = pExpr->_node.info;
tVariant* pCond = pExpr->_node.pRight->pVal; tVariant* pCond = pExpr->_node.pRight->pVal;
...@@ -2008,7 +2014,7 @@ void filterPrepare(void* expr, void* param) { ...@@ -2008,7 +2014,7 @@ void filterPrepare(void* expr, void* param) {
pInfo->optr = pExpr->_node.optr; pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr); pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
pInfo->param = pTSSchema; pInfo->param = pTSSchema;
if (pInfo->optr == TSDB_RELATION_IN) { if (pInfo->optr == TSDB_RELATION_IN) {
pInfo->q = (char*) pCond->arr; pInfo->q = (char*) pCond->arr;
} else { } else {
...@@ -2028,18 +2034,18 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -2028,18 +2034,18 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = *(STable**) p1; STable* pTable1 = *(STable**) p1;
STable* pTable2 = *(STable**) p2; STable* pTable2 = *(STable**) p2;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) { for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
int32_t colIndex = pColIndex->colIndex; int32_t colIndex = pColIndex->colIndex;
assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX); assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
char * f1 = NULL; char * f1 = NULL;
char * f2 = NULL; char * f2 = NULL;
int32_t type = 0; int32_t type = 0;
int32_t bytes = 0; int32_t bytes = 0;
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
f1 = (char*) TABLE_NAME(pTable1); f1 = (char*) TABLE_NAME(pTable1);
f2 = (char*) TABLE_NAME(pTable2); f2 = (char*) TABLE_NAME(pTable2);
...@@ -2073,14 +2079,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -2073,14 +2079,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return ret; return ret;
} }
} }
return 0; return 0;
} }
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp, void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
__ext_compar_fn_t compareFn) { __ext_compar_fn_t compareFn) {
STable* pTable = taosArrayGetP(pTableList, 0); STable* pTable = taosArrayGetP(pTableList, 0);
SArray* g = taosArrayInit(16, POINTER_BYTES); SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTable); taosArrayPush(g, &pTable);
tsdbRefTable(pTable); tsdbRefTable(pTable);
...@@ -2088,10 +2094,10 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable ...@@ -2088,10 +2094,10 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
for (int32_t i = 1; i < numOfTables; ++i) { for (int32_t i = 1; i < numOfTables; ++i) {
STable** prev = taosArrayGet(pTableList, i - 1); STable** prev = taosArrayGet(pTableList, i - 1);
STable** p = taosArrayGet(pTableList, i); STable** p = taosArrayGet(pTableList, i);
int32_t ret = compareFn(prev, p, pSupp); int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1); assert(ret == 0 || ret == -1);
tsdbRefTable(*p); tsdbRefTable(*p);
assert((*p)->type == TSDB_CHILD_TABLE); assert((*p)->type == TSDB_CHILD_TABLE);
...@@ -2103,20 +2109,20 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable ...@@ -2103,20 +2109,20 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(g, p); taosArrayPush(g, p);
} }
} }
taosArrayPush(pGroups, &g); taosArrayPush(pGroups, &g);
} }
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
if (size == 0) { if (size == 0) {
tsdbDebug("no qualified tables"); tsdbDebug("no qualified tables");
return pTableGroup; return pTableGroup;
} }
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, POINTER_BYTES); SArray* sa = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
...@@ -2126,7 +2132,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2126,7 +2132,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
tsdbRefTable(*pTable); tsdbRefTable(*pTable);
taosArrayPush(sa, pTable); taosArrayPush(sa, pTable);
} }
taosArrayPush(pTableGroup, &sa); taosArrayPush(pTableGroup, &sa);
tsdbDebug("all %zu tables belong to one group", size); tsdbDebug("all %zu tables belong to one group", size);
} else { } else {
...@@ -2134,18 +2140,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2134,18 +2140,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
pSupp->numOfCols = numOfOrderCols; pSupp->numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema; pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols; pSupp->pCols = pCols;
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
tfree(pSupp); tfree(pSupp);
} }
return pTableGroup; return pTableGroup;
} }
bool indexedNodeFilterFp(const void* pNode, void* param) { bool indexedNodeFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param; tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL; char* val = NULL;
...@@ -2155,7 +2161,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { ...@@ -2155,7 +2161,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
} else { } else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId); val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
} }
int32_t ret = 0; int32_t ret = 0;
if (val == NULL) { //the val is possible to be null, so check it out carefully if (val == NULL) { //the val is possible to be null, so check it out carefully
ret = -1; // val is missing in table tags value pairs ret = -1; // val is missing in table tags value pairs
...@@ -2192,7 +2198,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { ...@@ -2192,7 +2198,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
default: default:
assert(false); assert(false);
} }
return true; return true;
} }
...@@ -2222,7 +2228,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT ...@@ -2222,7 +2228,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
goto _error; goto _error;
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
pTable->name->data); pTable->name->data);
...@@ -2235,7 +2241,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT ...@@ -2235,7 +2241,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
//NOTE: not add ref count for super table //NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, POINTER_BYTES); SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableList(pTable, res); int32_t ret = getAllTableList(pTable, res);
...@@ -2246,7 +2252,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT ...@@ -2246,7 +2252,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables); tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
taosArrayDestroy(res); taosArrayDestroy(res);
...@@ -2282,7 +2288,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT ...@@ -2282,7 +2288,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
} CATCH( code ) { } CATCH( code ) {
CLEANUP_EXECUTE(); CLEANUP_EXECUTE();
terrno = code; terrno = code;
goto _error; goto _error;
// TODO: more error handling // TODO: more error handling
} END_TRY } END_TRY
...@@ -2318,12 +2324,12 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p ...@@ -2318,12 +2324,12 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p
pGroupInfo->numOfTables = 1; pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES); SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable); taosArrayPush(group, &pTable);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
...@@ -2375,7 +2381,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2375,7 +2381,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
return; return;
} }
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册