提交 b097c1e9 编写于 作者: R root

Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query

......@@ -198,38 +198,38 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
// allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols;
pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
taosArrayPush(pQueryHandle->pColumns, &colInfo);
pQueryHandle->statis[i].colId = colInfo.info.colId;
}
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = (STable*) taosArrayGetP(group, j);
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = pTable->tableId,
.pTableObj = pTable,
};
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
......@@ -259,17 +259,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
assert(pHandle != NULL);
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayPush(res, &pCheckInfo->pTableObj);
}
return res;
}
......@@ -285,11 +285,11 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond
static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL);
if (pCheckInfo->initBuf) {
return true;
}
pCheckInfo->initBuf = true;
int32_t order = pHandle->order;
......@@ -297,34 +297,34 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
if (pHandle->mem == NULL && pHandle->imem == NULL) {
return false;
}
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
// both iterators are NULL, no data in buffer right now
if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
return false;
}
bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
if (memEmpty && imemEmpty) { // buffer is empty
return false;
}
if (!memEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
......@@ -333,11 +333,11 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
}
if (!imemEmpty) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL);
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
......@@ -346,7 +346,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
}
return true;
}
......@@ -449,7 +449,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
pHandle->cur.fid = -1;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
STable* pTable = pCheckInfo->pTableObj;
......@@ -467,17 +467,17 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
// all data in mem are checked already.
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
(pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
// update the last key value
pCheckInfo->lastKey = win->ekey + step;
pHandle->cur.lastKey = win->ekey + step;
......@@ -486,7 +486,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
if (!ASCENDING_TRAVERSE(pHandle->order)) {
SWAP(win->skey, win->ekey, TSKEY);
}
return true;
}
......@@ -495,31 +495,31 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
if (key == TSKEY_INITIAL_VAL) {
return INT32_MIN;
}
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN;
}
if (fid > 0L && fid > INT32_MAX) {
fid = INT32_MAX;
}
return fid;
}
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1;
int32_t midSlot = firstSlot;
while (1) {
numOfBlocks = lastSlot - firstSlot + 1;
midSlot = (firstSlot + (numOfBlocks >> 1));
if (numOfBlocks == 1) break;
if (skey > pBlock[midSlot].keyLast) {
if (numOfBlocks == 2) break;
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
......@@ -531,7 +531,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
break; // got the slot
}
}
return midSlot;
}
......@@ -669,10 +669,10 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
......@@ -688,12 +688,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
}
cur->mixBlock = true;
cur->blockCompleted = false;
return;
}
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
......@@ -727,14 +727,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) {
cur->pos =
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = 0;
}
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
......@@ -744,14 +744,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
......@@ -767,7 +767,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
if (num <= 0) return -1;
keyList = (TSKEY*)pValue;
......@@ -826,13 +826,13 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1;
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block
int32_t i = 0, j = 0;
while(i < requiredNumOfCols && j < pCols->numOfCols) {
......@@ -905,7 +905,7 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
i++;
}
pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step;
......@@ -1027,7 +1027,7 @@ static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
......@@ -1038,7 +1038,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
STable* pTable = pCheckInfo->pTableObj;
......@@ -1054,12 +1054,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
cur->mixBlock = true;
}
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = cur->pos;
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
// no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
......@@ -1069,13 +1068,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(start, end, int32_t);
}
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
// the time window should always be right order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
......@@ -1133,11 +1135,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = tsArray[end];
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
if (numOfRows < pQueryHandle->outputCapacity) {
/**
* if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
......@@ -1157,14 +1159,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = tsArray[end];
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->lastKey = cur->win.ekey + step;
}
}
}
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
......@@ -1179,6 +1182,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
}
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
}
......@@ -1314,16 +1320,16 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
cleanBlockOrderSupporter(&sup, 0);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
int32_t cnt = 0;
int32_t numOfQualTables = 0;
for (int32_t j = 0; j < numOfTables; ++j) {
STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
if (pTableCheck->numOfBlocks <= 0) {
continue;
}
SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
......@@ -1428,26 +1434,26 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0);
if (numOfBlocks == 0) {
continue;
}
// todo return error code to query engine
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
break;
}
assert(numOfBlocks >= pQueryHandle->numOfBlocks);
if (pQueryHandle->numOfBlocks > 0) {
break;
}
}
// no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) {
if (code == TSDB_CODE_SUCCESS) {
......@@ -1458,10 +1464,10 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
*exists = false;
return code;
}
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
......@@ -1477,7 +1483,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
pQueryHandle->locateStart = true;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
......@@ -1486,7 +1492,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// current block is done, try next
if (!cur->mixBlock || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
......@@ -1497,10 +1503,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
......@@ -1518,15 +1524,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
while (pQueryHandle->activeIndex < numOfTables) {
if (hasMoreDataInCache(pQueryHandle)) {
return true;
}
pQueryHandle->activeIndex += 1;
}
return false;
}
......@@ -1544,14 +1550,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->order = TSDB_ORDER_DESC;
if (!tsdbNextDataBlock(pHandle)) {
return false;
}
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
......@@ -1559,7 +1565,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
}
pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 1;
......@@ -1576,7 +1582,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->checkFiles = true;
pSecQueryHandle->activeIndex = 0;
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
free(pSecQueryHandle);
return false;
......@@ -1586,24 +1592,24 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
// allocate buffer in order to load data blocks from file
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
}
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
assert(pMeta != NULL);
for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = {
......@@ -1611,10 +1617,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
.tableId = pCheckInfo->tableId,
.pTableObj = pCheckInfo->pTableObj,
};
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
}
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
......@@ -1624,17 +1630,17 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
assert(pCol->info.colId == pCol1->info.colId);
memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
}
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
// it is ascending order
......@@ -1658,7 +1664,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->checkFiles = false;
return true;
}
if (pQueryHandle->checkFiles) {
bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
......@@ -1671,11 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->cost.checkForNextTime += elapsedTime;
return exists;
}
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
}
// TODO: opt by consider the scan order
bool ret = doHasDataInBuffer(pQueryHandle);
terrno = TSDB_CODE_SUCCESS;
......@@ -1688,15 +1694,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
// starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
TSKEY key = TSKEY_INITIAL_VAL;
int32_t index = -1;
for(int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pCheckInfo->pTableObj->lastKey > key) {
......@@ -1704,36 +1710,36 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
index = i;
}
}
if (index == -1) {
// todo add failure test cases
return;
}
// erase all other elements in array list
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
if (i == index) {
continue;
}
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo);
}
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = key;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {key, key};
}
......@@ -1742,13 +1748,13 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
// filter the queried time stamp in the first place
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
pQueryHandle->order = TSDB_ORDER_DESC;
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
// starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t i = 0;
while(i < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
......@@ -1756,21 +1762,21 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
break;
}
i++;
}
// there are no data in all the tables
if (i == numOfTables) {
return;
}
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = pQueryHandle->window.skey;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}
......@@ -1794,7 +1800,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey);
break;
}
......@@ -1809,21 +1815,21 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
moveToNextRowInMem(pCheckInfo);
break;
}
} while(moveToNextRowInMem(pCheckInfo));
assert(numOfRows <= maxRowsToRead);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
int32_t emptySize = maxRowsToRead - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
......@@ -1835,7 +1841,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
SQueryFilePos* cur = &pHandle->cur;
STable* pTable = NULL;
// there are data in file
if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
......@@ -1857,13 +1863,13 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
SQueryFilePos* c = &pHandle->cur;
if (c->mixBlock) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));
......@@ -1883,7 +1889,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
for(int32_t i = 0; i < numOfCols; ++i) {
pHandle->statis[i].colId = colIds[i];
}
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
// always load the first primary timestamp column data
......@@ -1932,31 +1938,31 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
} else {
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
// todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
int32_t emptySize = pHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);
for(int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
return pHandle->pColumns;
}
}
......@@ -1967,11 +1973,11 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, pTable);
}
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS;
}
......@@ -1981,12 +1987,12 @@ static void destroyHelper(void* param) {
return;
}
tQueryInfo* pInfo = (tQueryInfo*)param;
if (pInfo->optr != TSDB_RELATION_IN) {
tfree(pInfo->q);
}
// tVariantDestroy(&(pInfo->q));
free(param);
}
......@@ -1998,7 +2004,7 @@ void filterPrepare(void* expr, void* param) {
}
pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
STSchema* pTSSchema = (STSchema*) param;
tQueryInfo* pInfo = pExpr->_node.info;
tVariant* pCond = pExpr->_node.pRight->pVal;
......@@ -2008,7 +2014,7 @@ void filterPrepare(void* expr, void* param) {
pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
pInfo->param = pTSSchema;
if (pInfo->optr == TSDB_RELATION_IN) {
pInfo->q = (char*) pCond->arr;
} else {
......@@ -2028,18 +2034,18 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = *(STable**) p1;
STable* pTable2 = *(STable**) p2;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
int32_t colIndex = pColIndex->colIndex;
assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
char * f1 = NULL;
char * f2 = NULL;
int32_t type = 0;
int32_t bytes = 0;
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
f1 = (char*) TABLE_NAME(pTable1);
f2 = (char*) TABLE_NAME(pTable2);
......@@ -2073,14 +2079,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return ret;
}
}
return 0;
}
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
__ext_compar_fn_t compareFn) {
STable* pTable = taosArrayGetP(pTableList, 0);
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTable);
tsdbRefTable(pTable);
......@@ -2088,10 +2094,10 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
for (int32_t i = 1; i < numOfTables; ++i) {
STable** prev = taosArrayGet(pTableList, i - 1);
STable** p = taosArrayGet(pTableList, i);
int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1);
tsdbRefTable(*p);
assert((*p)->type == TSDB_CHILD_TABLE);
......@@ -2103,20 +2109,20 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(g, p);
}
}
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
size_t size = taosArrayGetSize(pTableList);
if (size == 0) {
tsdbDebug("no qualified tables");
return pTableGroup;
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
......@@ -2126,7 +2132,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
tsdbRefTable(*pTable);
taosArrayPush(sa, pTable);
}
taosArrayPush(pTableGroup, &sa);
tsdbDebug("all %zu tables belong to one group", size);
} else {
......@@ -2134,18 +2140,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
pSupp->numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols;
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
tfree(pSupp);
}
return pTableGroup;
}
bool indexedNodeFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL;
......@@ -2155,7 +2161,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
} else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
}
int32_t ret = 0;
if (val == NULL) { //the val is possible to be null, so check it out carefully
ret = -1; // val is missing in table tags value pairs
......@@ -2192,7 +2198,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
default:
assert(false);
}
return true;
}
......@@ -2222,7 +2228,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
goto _error;
}
if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
pTable->name->data);
......@@ -2235,7 +2241,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
//NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableList(pTable, res);
......@@ -2246,7 +2252,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
taosArrayDestroy(res);
......@@ -2282,7 +2288,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
} CATCH( code ) {
CLEANUP_EXECUTE();
terrno = code;
goto _error;
goto _error;
// TODO: more error handling
} END_TRY
......@@ -2318,12 +2324,12 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
_error:
......@@ -2375,7 +2381,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
if (pQueryHandle == NULL) {
return;
}
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册